From 2d10f4524edd3bb4d628f791ba0b41b510b60698 Mon Sep 17 00:00:00 2001 From: John Schock Date: Fri, 9 Jul 2021 12:52:42 -0700 Subject: [PATCH] Refactor OmniCache (#252) Refactor Omnicache to improve performance, handle tags better, and remove internal yaml config file. Omnicache version 0.11 design notes: 1. Only a single remote per URL will be permitted. 2. Remote names inside omnicache will be UUIDs generated when a new URL is added. 3. Users may specify names for remotes, but they will be treated simply as "display names" without git significance. 4. As many objects as possible will be cached for any given remote; so all remotes will fetch tags. 4a. To avoid tag name collisions between remotes, tags will be fetched into per-remote namespaces. 5. Older omnicaches will be updated to meet the above design points when first encountered by the script. 5a. This implementation takes care that the underlying git construction is compatible (but not performant) when an older omnicache interacts with it. Older omnicache should not crash when encountering a newer omnicache directory but may take a lot of slow and unecessary actions to "re-initialize" it. --- .cspell.json | 3 +- edk2toolext/omnicache.py | 742 +++++++++++++++------------- edk2toolext/tests/test_omnicache.py | 496 ++++++++++++++++--- 3 files changed, 820 insertions(+), 421 deletions(-) diff --git a/.cspell.json b/.cspell.json index 3c21092c..6be0b4e7 100644 --- a/.cspell.json +++ b/.cspell.json @@ -123,6 +123,7 @@ "dbxupdate", "FileFlagsMask", "markdownlint", - "xffd" + "xffd", + "rtags" ] } diff --git a/edk2toolext/omnicache.py b/edk2toolext/omnicache.py index a059cef5..35f608c1 100644 --- a/edk2toolext/omnicache.py +++ b/edk2toolext/omnicache.py @@ -10,252 +10,366 @@ import argparse import datetime import yaml +import uuid from io import StringIO from edk2toolext import edk2_logging -from edk2toollib import utility_functions -from edk2toolext.edk2_git import Repo - - -class OmniCacheConfig(): - ''' - class to manage the Internal Omnicache config file. - Load, Save, Version check, etc. - ''' - - CONFIG_VERSION = 1 +from edk2toollib.utility_functions import RunCmd + +# Omnicache version 0.11 design notes: +# 1. Only a single remote per URL will be permitted. +# 2. Remote names inside omnicache will be UUIDs generated when a new URL is added. +# 3. Users may specify names for remotes, but they will be treated simply as "display names" without git significance. +# 4. As many objects as possible will be cached for any given remote; so all remotes will fetch tags. +# 4a. To avoid tag name collisions between remotes, tags will be fetched into per-remote namespaces. +# 5. Older omnicaches will be updated to meet the above design points when first encountered by the script. +# 5a. This implementation takes care that the underlying git construction is compatible (but not performant) when an +# older omnicache interacts with it. Older omnicache should not crash when encountering a newer omnicache directory +# but may take a lot of slow and unecessary actions to "re-initialize" it. + +OMNICACHE_VERSION = "0.11" + +# Prior to version 0.11, an "omnicache.yaml" file in the root of the omnicache repo was used to manage the cache +# configuration. Starting with 0.11, git config entries are used directly. If the file below is present, it will be +# removed to force re-initialization of the omnicache by older omnicache instances. (e.g. if an old branch with a prior +# implementation of omnicache is being built and uses the same cache directory) +PRE_0_11_OMNICACHE_FILENAME = "omnicache.yaml" + + +class Omnicache(): + """ Class for managing an omnicache instance""" + + def __init__(self, cachepath, create=False, convert=True): + """Initializes an omnicache + cachepath - path to the omnicache git repo + create - if True, and omnicache doesn't already exist at cachepath, create a new omnicache instance + convert - if True, and old (but compatible) omnicache already exists, at cachepath, convert it to this + version of Omnicache + """ + self.path = cachepath + self._InvalidateUrlLookupCache() + + (valid, isConversionCandidate) = self._ValidateOmnicache() + if (not valid): + if create and not isConversionCandidate: + self._InitOmnicache() + elif (convert and isConversionCandidate): + self._ConvertOmnicache() + else: + logging.critical("Omnicache at {0} not valid, and cannot create/convert.".format(cachepath)) + raise RuntimeError("Invalid cache path: {0}".format(cachepath)) + + def _ValidateOmnicache(self): + """Validates whether self.path has a valid omnicache instance. + Returns: (valid, convertible) where "valid" is True if a compatible Omnicache exists at self.path, and + "convertible" is True if an older Omnicache that can be converted exists at self.path. + """ + logging.info("Checking if {0} is valid omnicache.".format(self.path)) + if (not os.path.isdir(self.path)): + logging.debug("{0} does not exist - not valid (not convertible).".format(self.path)) + return (False, False) + out = StringIO() + ret = RunCmd("git", "rev-parse --is-bare-repository", workingdir=self.path, outstream=out) + if (ret != 0): + logging.debug("{0} error getting repo state - not valid (not convertible).".format(self.path)) + return (False, False) + if (out.getvalue().strip().lower() == "true"): + out = StringIO() + ret = RunCmd("git", "config --local omnicache.metadata.version", workingdir=self.path, outstream=out) + if (ret != 0): + logging.debug("{0} - error retrieving omnicache version. not valid (is convertible).".format(self.path)) + return (False, True) + if (out.getvalue().strip() == OMNICACHE_VERSION): + logging.debug("{0} - matching omnicache version. valid (convertible don't care).".format(self.path)) + return (True, True) + else: + logging.debug("{0} - non-matching omnicache version. not valid (is convertible).".format(self.path)) + return (False, True) + logging.debug("{0} - not a bare repo. not valid (not convertible).".format(self.path)) + return (False, False) + + def _InitOmnicache(self): + """Initializes a new omnicache instance""" + logging.critical("Initializing Omnicache in {0}".format(self.path)) + os.makedirs(self.path, exist_ok=True) + ret = RunCmd("git", "init --bare", workingdir=self.path) + if (ret != 0): + return ret + return RunCmd("git", + "config --local omnicache.metadata.version {0}".format(OMNICACHE_VERSION), + workingdir=self.path) + + def _ConvertOmnicache(self): + """Converts an existing bare git repo from a previous omnicache version to the current omnicache version""" + logging.info("Converting Omnicache in {0} to latest format.".format(self.path)) + if (os.path.exists(os.path.join(self.path, PRE_0_11_OMNICACHE_FILENAME))): + os.remove(os.path.join(self.path, PRE_0_11_OMNICACHE_FILENAME)) + remotes = Omnicache.GetRemotes(self.path) + logging.info("Renaming non-UUID remotes with UUID.") + for (name, _) in remotes.items(): + if (not Omnicache._IsValidUuid(name)): + logging.info("Converting remote {0} to UUID".format(name)) + # Rename the remote with a valid UUID + newName = str(uuid.uuid4()) + ret = RunCmd("git", "remote rename {0} {1}".format(name, newName), workingdir=self.path) + if (ret != 0): + return ret + # Remove previous fetch config entries and regenerate them. Proceed to create new ones even if it fails. + RunCmd("git", "config --local --unset-all remote.{0}.fetch".format(newName), workingdir=self.path) + RunCmd("git", + "config --local --add remote.{0}.fetch +refs/heads/*:refs/remotes/{0}/*".format(newName), + workingdir=self.path) + RunCmd("git", + "config --local --add remote.{0}.fetch refs/tags/*:refs/rtags/{0}/*".format(newName), + workingdir=self.path) + # Add the original name as a display name + RunCmd("git", + "config --local omnicache.{0}.displayname {1}".format(newName, name), + workingdir=self.path) + logging.info("Remote {0} converted to {1}.".format(name, newName)) + # delete any tags in the global name space (older omnicaches fetched all tags into the global tag namespace) + logging.info("Removing global tags") + out = StringIO() + RunCmd("git", "tag -l", workingdir=self.path, outstream=out) + tags = " ".join(out.getvalue().splitlines()) + RunCmd("git", "tag -d {0}".format(tags), workingdir=self.path) + # remove duplicates + logging.info("Removing remotes with duplicate URLs") + knownUrls = [] + remotes = Omnicache.GetRemotes(self.path) + for (name, url) in remotes.items(): + if (url not in knownUrls): + logging.info("Retaining remote {0} with unique URL {1}".format(name, url)) + knownUrls.append(url) + else: + logging.info("Removing remote {0} with duplicate URL {1}".format(name, url)) + RunCmd("git", "remote remove {0}".format(name), workingdir=self.path) + RunCmd("git", "config --local --unset omnicache.{0}.displayname".format(name), workingdir=self.path) + # write current omnicache version into cache + logging.info("Writing Omnicache version") + return RunCmd("git", + "config --local omnicache.metadata.version {0}".format(OMNICACHE_VERSION), + workingdir=self.path) + + def _RefreshUrlLookupCache(self): + """Refreshes the URL lookup cache""" + if (len(self.urlLookupCache) == 0): + logging.info("Regenerating URL lookup cache.") + out = StringIO() + ret = RunCmd("git", r"config --local --get-regexp remote\..*?\.url", workingdir=self.path, outstream=out) + if (ret != 0): + return None + # output is in the form: remote..url + for remote in out.getvalue().splitlines(): + self.urlLookupCache[remote.split()[1]] = ".".join(remote.split()[0].split(".")[1:-1]) + + def _InvalidateUrlLookupCache(self): + """Invalidates the URL lookup cache""" + logging.debug("Invalidating URL lookup cache.") + self.urlLookupCache = {} + + def _LookupRemoteForUrl(self, url): + """Returns the git remote name for the specified URL, or None if it doesn't exist""" + self._RefreshUrlLookupCache() + if (url in self.urlLookupCache): + return self.urlLookupCache[url] + return None - def __init__(self, absfilepath): - self.version = OmniCacheConfig.CONFIG_VERSION - self.filepath = absfilepath - self.last_change = datetime.datetime.strftime(datetime.datetime.now(), "%A, %B %d, %Y %I:%M%p") - if os.path.isfile(self.filepath): - self._Load() - else: - self.remotes = {} - - def _Load(self): - with open(self.filepath) as yml_file: - content = yaml.safe_load(yml_file) - - if "version" not in content: - raise Exception("Unsupported Config Version (None)") - elif content["version"] == self.version: - # parse yml into config data - self.remotes = {x["name"]: x for x in content["remotes"]} - self.last_change = content["last_change"] - else: - self._Transition(content) - - def Save(self): - data = {"version": self.version, "remotes": list(self.remotes.values()), - "last_change": datetime.datetime.strftime(datetime.datetime.now(), - "%A, %B %d, %Y %I:%M%p")} - with open(self.filepath, 'w') as outfile: - yaml.dump(data, outfile, default_flow_style=False) - - def _Transition(self, data): - # Add code here to move old config data to new format - raise Exception("Unsupported config data") - - def Log(self, level=logging.DEBUG): - logging.log(level, "OmniCache Config") - logging.log(level, " Filepath: {0}".format(self.filepath)) - logging.log(level, " Version: {%d}", self.version) - logging.log(level, " Remotes({%d})", len(self.remotes)) - for remote in self.remotes.values(): - rstring = "Name: {0} Url: {1} TagSync: {2}".format(remote["name"], remote["url"], ("tag" in remote)) - logging.log(level, " " + rstring) - - def Add(self, name, url, tags=False): - # check if this already exists - if self.Contains_url(url): - logging.warning("Skipping add this entry %s %s" % (name, url)) - return - # if the name already exists, we overwrite it - remote = {"name": name, "url": url} - if tags: - remote["tag"] = True - self.remotes[name] = remote - - def Contains_url(self, url): - for x in self.remotes.values(): - if x["url"] == url: - return True + def AddRemote(self, url, name=None): + """Adds a remote for the specified URL to the omnicache + url - URL for which to add a remote + name - (optional) - provides a "display name" to be associated with this remote. + """ + # if we already have this remote (i.e. a remote with this URL exists), then just update. + if (self._LookupRemoteForUrl(url) is not None): + return self.UpdateRemote(url, newName=name) + # otherwise create it. + logging.info("Adding new remote for url {0}".format(url)) + newName = str(uuid.uuid4()) + ret = RunCmd("git", "remote add {0} {1}".format(newName, url), workingdir=self.path) + if (ret != 0): + return ret + self._InvalidateUrlLookupCache() + # add display name, if specified + if (name is not None): + ret = RunCmd("git", + "config --local omnicache.{0}.displayname {1}".format(newName, name), + workingdir=self.path) + if (ret != 0): + return ret + # add a special fetch refspec to fetch remote tags into a per-remote local namespace. + return RunCmd("git", + "config --local --add remote.{0}.fetch refs/tags/*:refs/rtags/{0}/*".format(newName), + workingdir=self.path) + + def RemoveRemote(self, url): + """Removes the remote for the specified url from the cache""" + name = self._LookupRemoteForUrl(url) + if (name is None): + logging.critical("Failed to remove node for url {0}: such a remote does not exist.".format(url)) + return 1 + logging.info("Removing remote for url {0}".format(url)) + ret = RunCmd("git", "remote remove {0}".format(name), workingdir=self.path) + self._InvalidateUrlLookupCache() + return ret + + def UpdateRemote(self, oldUrl, newUrl=None, newName=None): + """Updates the remote + oldUrl - current url for the remote + newUrl - (optional) if specified, updates the remote to point to the new URL. + newName - (optional) if specified, updates the "displayname" for the remote. + """ + remote = self._LookupRemoteForUrl(oldUrl) + if (remote is None): + logging.critical("Failed to update node for url {0}: such a remote does not exist.".format(oldUrl)) + return 1 + if (newName is not None): + logging.info("Updating display name for url {0} to {1}".format(oldUrl, newName)) + ret = RunCmd("git", + "config --local omnicache.{0}.displayname {1}".format(remote, newName), + workingdir=self.path) + if (ret != 0): + return ret + if (newUrl is not None): + logging.info("Updating url {0} to {1}".format(oldUrl, newUrl)) + ret = RunCmd("git", "remote set-url {0} {1}".format(remote, newUrl), workingdir=self.path) + self._InvalidateUrlLookupCache() + if (ret != 0): + return ret + + return 0 + + def Fetch(self): + """Fetches all remotes""" + logging.info("Fetching all remotes.") + self._RefreshUrlLookupCache() + # Tricky: we pass no-tags here, since we set up custom fetch refs for tags on a per-remote basis. This prevents + # git from fetching the first set of tags into the global namespace. + return RunCmd("git", "fetch --all -j {0} --no-tags".format(len(self.urlLookupCache)), workingdir=self.path) + + def GetRemoteData(self): + """Gets Remote Data + Returns: a dict of dicts of the form: + {"":{"url":, "displayname":}} + note that "displayname" may not be present if the remote has no display name. + """ + logging.info("Retrieving all remote data") + self._RefreshUrlLookupCache() + remoteData = {} + for url in self.urlLookupCache.keys(): + remoteData[self.urlLookupCache[url]] = {"url": url} + + out = StringIO() + ret = RunCmd("git", + r"config --local --get-regexp omnicache\..*?\.displayname", + workingdir=self.path, + outstream=out) + if (ret != 0): + return remoteData + + for displayName in out.getvalue().splitlines(): + remoteName = displayName.split()[0].split(".")[1] + if (remoteName in remoteData.keys()): + remoteData[remoteName].update({"displayname": displayName.split()[1]}) + return remoteData + + def List(self): + """Prints the current set of remotes""" + print("List OMNICACHE content:\n") + remoteData = self.GetRemoteData() + if (len(remoteData) == 0): + print("No remotes.") + for (name, data) in remoteData.items(): + print("Id {0}: {1}".format(name, str(data))) + + @staticmethod + def GetRemotes(path): + """Gets the remotes for the git repository at the specified path. + Returns: dict of the form {:} + """ + logging.info("Retreiving remotes") + remotes = {} + out = StringIO() + + ret = RunCmd("git", "remote -v", workingdir=path, outstream=out) + if (ret != 0): + return remotes + + # Note: this loop assumes fetch and push URLs will be identical. If not, the last URL output will be the result. + for remote in out.getvalue().splitlines(): + remoteInfo = remote.split() + remotes[remoteInfo[0]] = remoteInfo[1] + + return remotes + + @staticmethod + def _IsValidUuid(val): + """Returns whether the input is valid UUID""" + try: + uuid.UUID(str(val)) + return True + except ValueError: + pass return False - def Contains_name(self, name): - for x in self.remotes.values(): - if x["name"] == name: - return True - return False - def GetNameForUrl(self, url): - for x in self.remotes.values(): - if (x["url"] == url): - return x["name"] - return None - - def Remove(self, del_name): - del self.remotes[del_name] - - def Contains(self, name): - return name in self.remotes +def ProcessInputConfig(omnicache, input_config): + """Adds a list of remotes from a YAML config file to the omnicache""" + logging.info("Adding remotes from {0}".format(input_config)) + with open(input_config) as icf: + content = yaml.safe_load(icf) + if "remotes" in content: + for remote in content["remotes"]: + # dict.get() used here to set name=None if no name specified in input cfg file. + omnicache.AddRemote(remote["url"], name=remote.get("name")) -OMNICACHE_VERSION = "0.10" -OMNICACHE_FILENAME = "omnicache.yaml" + return 0 -def CommonFilePathHandler(path): - ''' - function to check for absolute path and if not - concat with current dir and return absolute real path - ''' - if not os.path.isabs(path): - path = os.path.join(os.getcwd(), path) - path = os.path.realpath(path) - return path +def ScanDirectory(omnicache, scanpath): + """Recursively scans a directory for git repositories and adds remotes and submodule remotes to omnicache""" + logging.info("Scanning {0} for remotes to add.".format(scanpath)) + if (not os.path.isdir(scanpath)): + logging.critical("specified scan path is invalid.") + return -1 + + for (dirpath, dirnames, filenames) in os.walk(scanpath): + if (".git" in dirnames): + newRemotes = Omnicache.GetRemotes(dirpath) + for (name, url) in newRemotes.items(): + omnicache.AddRemote(url, name) + + if (".gitmodules" in filenames): + out = StringIO() + ret = RunCmd("git", "config --file .gitmodules --get-regexp url", workingdir=dirpath, outstream=out) + if (ret == 0): + for submodule in out.getvalue().splitlines(): + url = submodule.split()[1] + name = submodule.split()[0].split(".")[1] + omnicache.AddRemote(url, name) + return 0 -def AddEntriesFromConfig(config, input_config_file): - ''' - Add config entries found in the config file - to the omnicache. Entries already in omnicache - with the same name will be updated. - return - the number of entries added to cache - ''' +def Export(omnicache, exportPath): + """Exports omnicache configuration to YAML""" + logging.info("Exporting omnicache config for {0} to {1}".format(omnicache.path, exportPath)) + content = [] + for (name, data) in omnicache.GetRemoteData().items(): + remoteToWrite = {"url": data["url"]} + if ("displayname" in data): + remoteToWrite["name"] = data["displayname"] + else: + remoteToWrite["name"] = name + content.append(remoteToWrite) - count = 0 - with open(input_config_file) as yml_file: - content = yaml.safe_load(yml_file) - if "remotes" in content: - for remote in content["remotes"]: - currentRemoteName = config.GetNameForUrl(remote["url"]) - if (currentRemoteName is not None): - if (remote["name"] != currentRemoteName): - logging.debug( - "remote with name: {0} already in cache, renaming to {1}" - .format(currentRemoteName, remote["name"]) - ) - RemoveEntry(config, currentRemoteName) # remove here, then fall through to add entry below. - else: - logging.debug("remote with name: {0} already in cache".format(remote["name"])) - continue - if "tag" in remote: - AddEntry(config, remote["name"], remote["url"], bool(remote["tag"])) - else: - AddEntry(config, remote["name"], remote["url"]) - count += 1 - return (count, content["remotes"]) - - -def InitOmnicache(path): - logging.critical("Initialize Omnicache to {0}".format(path)) - os.makedirs(path) - return utility_functions.RunCmd("git", "--bare init", workingdir=path) - - -def AddEntry(config, name, url, tags=False): - logging.info("Adding remote ({0} : {1}) to Omnicache".format(name, url)) - - if config.Contains(name): - logging.info("Updating remote ({0} : {1}) in Omnicache".format(name, url)) - param = "remote set-url {0} {1}".format(name, url) - else: - logging.info("Adding remote ({0} : {1}) to Omnicache".format(name, url)) - param = "remote add {0} {1}".format(name, url) - - if(utility_functions.RunCmd("git", param) == 0): - config.Add(name, url, tags) - else: - logging.error("Failed to add remote for {0}".format(name)) - - -def RemoveEntry(config, name): - logging.info("Removing remote named {0}".format(name)) - param = "remote remove {0}".format(name) - if utility_functions.RunCmd("git", param) == 0: - config.Remove(name) - else: - logging.error("Failed to remove remote for {0}".format(name)) - - -def ConsistencyCheckCacheConfig(config): - ''' - Check the git remote list vs what is in the config file - Add remote to git for anything only in config - Add git remote from git into the config file (tags will be false) - - return - 0: success - non-zero: indicates an error - ''' - - logging.debug("start consistency check between git and omnicache config") - out = StringIO() - param = "remote -v" - gitnames = [] # list of git remote names as found in git repo - gitret = utility_functions.RunCmd("git", param, outstream=out) - - if gitret != 0: - logging.critical("Could not list git remotes") - return gitret - - lines = out.getvalue().split('\n') - out.close() - for line in lines: - line = line.strip() - if len(line) == 0: - # empty line - continue - git = line.split() - gitnames.append(git[0]) # save for later - if(not config.Contains(git[0])): - logging.warning("Found entry in git not in config. Name: {0} Url: {1}".format(git[0], git[1])) - config.Add(git[0], git[1]) - config.Save() - - gitnames = set(gitnames) - for remote in config.remotes.values(): - if(remote["name"] not in gitnames): - logging.warning("Found entry in config not in git. Name: {0} Url: {1}".format(remote["name"], - remote["url"])) - param = "remote add {0} {1}".format(remote["name"], remote["url"]) - utility_functions.RunCmd("git", param) + with open(exportPath, "w") as ocf: + yaml.dump({"remotes": content}, ocf) return 0 -def MultiFetch(remotes, tags=False, retries=3): - ''' - do git operations to fetch the specified entries. Supports fetching in parallel for speed, and supports retries to - add robustness to transient fetch failures. - - return - 0: success - non-zero: git command line error unresolved by retry. - ''' - param = "fetch" - param += " -j {0}".format(len(remotes)) - if (tags): - # might want to look at something more complex to avoid tag conflicts - # https://stackoverflow.com/questions/22108391/git-checkout-a-remote-tag-when-two-remotes-have-the-same-tag-name - # param += "+refs/heads/:refs/remotes/{0}/ +refs/tags/:refs/rtags/{0}/".format(name) - param += " --tags" - else: - param += " --no-tags" - param += " --multiple " + " ".join(remotes) - - for _ in range(retries): - ret = utility_functions.RunCmd("git", param) - if ret == 0: - break - return ret - - def get_cli_options(): parser = argparse.ArgumentParser(description='Tool to provide easy method create and manage the OMNICACHE', ) parser.add_argument(dest="cache_dir", help="path to an existing or desired OMNICACHE directory") @@ -267,17 +381,19 @@ def get_cli_options(): action="store_true", default=False) parser.add_argument("-l", "--list", dest="list", default=False, action="store_true", help="List config of OMNICACHE") - parser.add_argument("-a", "--add", dest="add", nargs='*', action="append", - help="Add config entry to OMNICACHE ", + parser.add_argument("-a", "--add", dest="add", nargs=2, action="append", + help="Add config entry to OMNICACHE ", default=[]) parser.add_argument("-c", "--configfile", dest="input_config_file", default=None, help="Add new entries from config file to OMNICACHE") + parser.add_argument("-e", "--exportConfig", dest="output_config_file", default=None, + help="Export current omnicache config as a yaml file") group = parser.add_mutually_exclusive_group() group.add_argument("-u", "--update", "--fetch", dest="fetch", action="store_true", help="Update the Omnicache. All cache changes also cause a fetch", default=False) group.add_argument("--no-fetch", dest="no_fetch", action="store_true", help="Prevent auto-fetch if implied by other arguments.", default=False) - parser.add_argument("-r", "--remove", dest="remove", nargs="?", action="append", + parser.add_argument("-r", "--remove", dest="remove", nargs=1, action="append", help="remove config entry from OMNICACHE ", default=[]) parser.add_argument('--version', action='version', version='%(prog)s ' + OMNICACHE_VERSION) parser.add_argument("--debug", dest="debug", help="Output all debug messages to console", @@ -293,11 +409,6 @@ def main(): console = edk2_logging.setup_console_logging(False) logger.addHandler(console) - ErrorCode = 0 - auto_fetch = False - input_config_remotes = None - - # arg parse args = get_cli_options() if args.debug: @@ -306,141 +417,82 @@ def main(): logging.info("Log Started: " + datetime.datetime.strftime( datetime.datetime.now(), "%A, %B %d, %Y %I:%M%p")) - args.cache_dir = CommonFilePathHandler(args.cache_dir) + args.cache_dir = os.path.realpath(os.path.abspath(args.cache_dir)) logging.debug("OMNICACHE dir: {0}".format(args.cache_dir)) - # input config file for adding new entries if args.input_config_file is not None: - args.input_config_file = CommonFilePathHandler(args.input_config_file) + args.input_config_file = os.path.realpath(os.path.abspath(args.input_config_file)) if not os.path.isfile(args.input_config_file): logging.critical("Invalid -c argument given. File ({0}) isn't valid".format(args.input_config_file)) return -4 logging.debug("Args: " + str(args)) - omnicache_config = None # config object - omnicache_config_file = os.path.join(args.cache_dir, OMNICACHE_FILENAME) - + omnicache = None + auto_fetch = False + # new: initialize omnicache and error if it already exists if args.new: if os.path.isdir(args.cache_dir): logging.critical("--new argument given but OMNICACHE path already exists!") return -1 - InitOmnicache(args.cache_dir) + omnicache = Omnicache(args.cache_dir, create=True) auto_fetch = True - if args.init: - if os.path.isdir(args.cache_dir): - if os.path.isfile(omnicache_config_file): - logging.debug("OMNICACHE already exists. No need to initialize") - else: - InitOmnicache(args.cache_dir) + # init: initialize omnicache if not already initialized. + if args.init and omnicache is None: + omnicache = Omnicache(args.cache_dir, create=True, convert=True) auto_fetch = True - # Check to see if exists - if not os.path.isdir(args.cache_dir): - logging.critical("OMNICACHE path invalid.") - return -2 - - # load config - omnicache_config = OmniCacheConfig(omnicache_config_file) + # Other args require omnicache, so check that it exists. + if omnicache is None: + omnicache = Omnicache(args.cache_dir) - os.chdir(args.cache_dir) + # add: add new source(s) to omnicache from command line arg. + if (len(args.add) > 0): + for (name, url) in args.add: + ret = omnicache.AddRemote(url, name) + if (ret != 0): + return -3 + auto_fetch = True - if(len(args.add) > 0): + # config: add or update sources(s) from config file. + if (args.input_config_file is not None): + ret = ProcessInputConfig(omnicache, args.input_config_file) + if (ret != 0): + return -4 auto_fetch = True - for inputdata in args.add: - if len(inputdata) == 2: - AddEntry(omnicache_config, inputdata[0], inputdata[1]) - elif len(inputdata) == 3: - AddEntry(omnicache_config, inputdata[0], inputdata[1], bool(inputdata[2])) - else: - logging.critical("Invalid Add Entry. Should be ") - return -3 - if(args.input_config_file is not None): - (count, input_config_remotes) = AddEntriesFromConfig(omnicache_config, args.input_config_file) - if(count > 0): - auto_fetch = True + # remove: remove source(s) from omnicache as specified by command line arg. + if (len(args.remove) > 0): + for url in args.remove: + ret = omnicache.RemoveRemote(url) + if (ret != 0): + return -4 + + # scan: recursively scan the given directory and add all repos and submodules + if (args.scan is not None): + ret = ScanDirectory(omnicache, args.scan) + if (ret != 0): + return -5 + auto_fetch = True - if len(args.remove) > 0: - for inputdata in args.remove: - RemoveEntry(omnicache_config, inputdata) + # fetch: update the omnicache with objects from its remotes. + # Note: errors are ignored here, since transient network failures may occur that prevent cache update. Those just + # mean the omnicache may be a a little stale which should not be fatal to users of the cache. + if(args.fetch or (auto_fetch and not args.no_fetch)): + omnicache.Fetch() - # if we need to scan - if args.scan is not None: - logging.critical("OMNICACHE is scanning the folder %s.") - if not os.path.isdir(args.scan): - logging.error("Invalid scan directory") - return -4 - reposFound = dict() - # iterate through top level directories - dirs = os.listdir(args.scan) - while len(dirs) > 0: - item = dirs.pop() - itemDir = os.path.join(args.scan, item) - if os.path.isfile(itemDir): - continue - logging.info("Scanning %s for a git repo" % item) - gitDir = os.path.join(itemDir, ".git") - # Check if it's a directory or a file (submodules usually have a file instead of a folder) - if os.path.isdir(gitDir) or os.path.isfile(gitDir): - repo = Repo(itemDir) - if repo.url: - if repo.url not in reposFound: - reposFound[repo.url] = item - else: - logging.warning("Skipping previously found repo at %s with url %s" % (item, repo.url)) - else: # if repo.url is none - logging.error("Url not found for git repo at: %s" % itemDir) - # check for submodules - if repo.submodules: - for submodule in repo.submodules: - dirs.append(os.path.join(item, submodule)) - else: - logging.error("Git repo not found at %s" % itemDir) - # go through all the URLs found - for url in reposFound: - omnicache_config.Add(reposFound[url], url) + # list: print out the omnicache contents. + if (args.list): + omnicache.List() - omnicache_config.Save() + # export: + if (args.output_config_file): + ret = Export(omnicache, args.output_config_file) + if (ret != 0): + return -6 - if(args.fetch or (auto_fetch and not args.no_fetch)): - logging.critical("Updating OMNICACHE") - # as an optimization, if input config file provided, only fetch remotes specified in input config - # otherwise, fetch all remotes in the OmniCache - if (input_config_remotes is not None): - remotes = [x["name"] for x in input_config_remotes] - else: - remotes = [omnicache_config.remotes.keys()] - - # Fetch no-tags and tags separately. - no_tags = [omnicache_config.remotes[x]["name"] for x in remotes if "tag" not in omnicache_config.remotes[x]] - with_tags = [omnicache_config.remotes[x]["name"] for x in remotes if "tag" in omnicache_config.remotes[x]] - if (len(no_tags) > 0): - ret = MultiFetch(no_tags, tags=False) - if (ret != 0) and (ErrorCode == 0): - ErrorCode = ret - if (len(with_tags) > 0): - ret = MultiFetch(with_tags, tags=True) - if (ret != 0) and (ErrorCode == 0): - ErrorCode = ret - - if args.list: - ret = ConsistencyCheckCacheConfig(omnicache_config) - if (ret != 0) and (ErrorCode == 0): - ErrorCode = ret - print("List OMNICACHE content\n") - if len(omnicache_config.remotes) == 0: - logging.warning("No Remotes to show") - - for remote in omnicache_config.remotes.values(): - rstring = "Name: {0}\n Url: {1}\n Sync Tags: {2}".format(remote["name"], remote["url"], ("tag" in remote)) - print(" " + rstring + "\n\n") - - print("To use your OMNICACHE set the env variable:") - print("set OMNICACHE_PATH=" + args.cache_dir) - - return ErrorCode + return 0 if __name__ == '__main__': diff --git a/edk2toolext/tests/test_omnicache.py b/edk2toolext/tests/test_omnicache.py index 3175a574..7301f275 100644 --- a/edk2toolext/tests/test_omnicache.py +++ b/edk2toolext/tests/test_omnicache.py @@ -11,6 +11,8 @@ import logging import tempfile import shutil +import yaml +import sys from io import StringIO from edk2toolext import omnicache from edk2toollib import utility_functions @@ -27,7 +29,13 @@ def prep_workspace(): test_dir = tempfile.mkdtemp() logging.debug("temp dir is: %s" % test_dir) else: - shutil.rmtree(test_dir) + for _ in range(3): + try: + shutil.rmtree(test_dir) + except OSError as err: + logging.warning(f"Failed to fully remove {test_dir}: {err}") + else: + break test_dir = tempfile.mkdtemp() current_dir = os.path.abspath(os.getcwd()) @@ -39,7 +47,13 @@ def clean_workspace(): return if os.path.isdir(test_dir): - shutil.rmtree(test_dir) + for _ in range(3): + try: + shutil.rmtree(test_dir) + except OSError as err: + logging.warning(f"Failed to fully remove {test_dir}: {err}") + else: + break test_dir = None @@ -57,84 +71,416 @@ def setUpClass(cls): def tearDownClass(cls): clean_workspace() - def test_basic_init(self): - valueabs = os.path.join(os.path.abspath(os.getcwd()), "test", "test2") - result = omnicache.CommonFilePathHandler(valueabs) - assert(result == valueabs) + def test_omnicache_new(self): + testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") + + # create a new cache + oc = omnicache.Omnicache(testcache, create=True, convert=False) + + # check that the new cache was created as a bare repo + out = StringIO() + gitret = utility_functions.RunCmd("git", "rev-parse --is-bare-repository", workingdir=testcache, outstream=out) + assert(gitret == 0) + assert(out.getvalue().strip().lower() == "true") + + # check that it has the right metadata + out = StringIO() + gitret = utility_functions.RunCmd("git", + "config --local omnicache.metadata.version", + workingdir=testcache, + outstream=out) + assert(gitret == 0) + assert(out.getvalue().strip().lower() == omnicache.OMNICACHE_VERSION) + + # check that omnicache thinks it's valid + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + # remove the metadata to simulate an older omnicache that is candidate for conversion + gitret = utility_functions.RunCmd("git", + "config --local --unset omnicache.metadata.version", + workingdir=testcache) + assert(gitret == 0) + + # check that omnicache thinks it's not valid but convertible. + (valid, convertible) = oc._ValidateOmnicache() + assert(not valid) + assert(convertible) + + # attempt to create a new cache - test repo is valid git repo, but doesn't have the meta. + with self.assertRaises(RuntimeError): + oc = omnicache.Omnicache(testcache, create=False, convert=False) + + # set the metadata version to an unexpected value + gitret = utility_functions.RunCmd( + "git", + "config --local omnicache.metadata.version {0}".format(omnicache.OMNICACHE_VERSION + "x"), + workingdir=testcache) + assert(gitret == 0) + + # check that omnicache thinks it's not valid but convertible. + (valid, convertible) = oc._ValidateOmnicache() + assert(not valid) + assert(convertible) + + # attempt to create a new cache - test repo is valid git repo, but doesn't have the expected meta. + with self.assertRaises(RuntimeError): + oc = omnicache.Omnicache(testcache, create=False, convert=False) - def test_commonfilepathhandler_real(self): - valueabs = os.path.join(os.path.abspath(os.getcwd()), "test", "test2") - result = omnicache.CommonFilePathHandler(os.path.join(valueabs, "..", "test2")) - assert(result == valueabs) + # now make it a non-bare (but technically valid) git repo + gitret = utility_functions.RunCmd("git", "config --local core.bare false", workingdir=testcache) + assert(gitret == 0) - def test_commonfilepathhandler_relative(self): - valueabs = os.path.join(os.path.abspath(os.getcwd()), "test", "test2") - result = omnicache.CommonFilePathHandler(os.path.join("test", "test2")) - assert(result == valueabs) + # check that omnicache thinks it's not valid and not convertible. + (valid, convertible) = oc._ValidateOmnicache() + assert(not valid) + assert(not convertible) - def test_omnicache_init(self): + # attempt to create a new cache - test repo is valid non-bare git repo + with self.assertRaises(RuntimeError): + oc = omnicache.Omnicache(testcache, create=False, convert=False) + + # make it it a non-valid git repo + os.remove(os.path.join(testcache, "HEAD")) + + # check that omnicache thinks it's not valid and not convertible. + (valid, convertible) = oc._ValidateOmnicache() + assert(not valid) + assert(not convertible) + + # attempt to create a new cache - test repo is not valid git repo + with self.assertRaises(RuntimeError): + oc = omnicache.Omnicache(testcache, create=False, convert=False) + + def test_omnicache_convert(self): testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") - testconfigs = [ - { - "cfgfile": os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcfg.yaml"), - "name": "openssl", - "url": "https://github.com/openssl/openssl.git", - "tag": "true" - }, - { - "cfgfile": os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcfg2.yaml"), - "name": "openssl", - "url": "https://foobar.com/openssl/openssl.git", - "tag": "true" - } - ] - - for testconfig in testconfigs: - currentdir = os.path.abspath(os.getcwd()) - with open(testconfig["cfgfile"], "w") as configyaml: - configyaml.write("remotes:\n") - configyaml.write("- name: {0}\n".format(testconfig["name"])) - configyaml.write(" url: {0}\n".format(testconfig["url"])) - configyaml.write(" tag: {0}\n".format(testconfig["tag"])) - - omnicache_config_file = os.path.join(testcache, omnicache.OMNICACHE_FILENAME) - if(os.path.isdir(testcache)): - if(os.path.isfile(omnicache_config_file)): - logging.debug("OMNICACHE already exists. No need to initialize") + + # create a new cache + oc = omnicache.Omnicache(testcache, create=True, convert=False) + + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + # remove the metadata to simulate an older omnicache that is candidate for conversion + gitret = utility_functions.RunCmd("git", + "config --local --unset omnicache.metadata.version", + workingdir=testcache) + assert(gitret == 0) + + # add an empty file to simulate the old config yaml + with open(os.path.join(testcache, omnicache.PRE_0_11_OMNICACHE_FILENAME), "w") as yf: + yf.write("Not A Real YAML File") + + # confirm that _ValidateOmnicache correctly identifies cache state + (valid, convertible) = oc._ValidateOmnicache() + assert(not valid) + assert(convertible) + + # add a traditionally-named remote to the cache to simulate an older omnicache + gitret = utility_functions.RunCmd( + "git", + "remote add pytools-ext https://github.com/tianocore/edk2-pytool-extensions.git", + workingdir=testcache) + assert(gitret == 0) + + # add a second copy of the remote remote with a different name to the cache to simulate a duplicate + gitret = utility_functions.RunCmd( + "git", + "remote add pytools-ext2 https://github.com/tianocore/edk2-pytool-extensions.git", + workingdir=testcache) + assert(gitret == 0) + + # fetch the remote and its tags to populate the cache for conversion. + gitret = utility_functions.RunCmd("git", "fetch pytools-ext --tags", workingdir=testcache) + assert(gitret == 0) + + # confirm that _ValidateOmnicache still correctly identifies cache state + (valid, convertible) = oc._ValidateOmnicache() + assert(not valid) + assert(convertible) + + # re-create the omnicache object to trigger conversion + oc = omnicache.Omnicache(testcache, create=False, convert=True) + + # validate converted cache + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + # verify that old config file was deleted. + assert(not os.path.exists(os.path.join(testcache, omnicache.PRE_0_11_OMNICACHE_FILENAME))) + + # verify that the traditionally-named remote is no longer in the cache (it should have been renamed with a UUID) + remotes = omnicache.Omnicache.GetRemotes(testcache) + assert("pytools-ext" not in remotes.keys()) + + # verify that the URL still is in the cache + assert("https://github.com/tianocore/edk2-pytool-extensions.git" in remotes.values()) + + # verify that there is exactly one entry in the cache for this URL (duplicates should be removed) + assert(sum(url == "https://github.com/tianocore/edk2-pytool-extensions.git" for url in remotes.values()) == 1) + + # verify that the former remote name is now the "omnicache display name" + for (name, url) in remotes.items(): + if (url == "https://github.com/tianocore/edk2-pytool-extensions.git"): + out = StringIO() + gitret = utility_functions.RunCmd("git", + f"config --local omnicache.{name}.displayname", + workingdir=testcache, + outstream=out) + assert(gitret == 0) + displayname = out.getvalue().strip() + # if there are duplicates, which displayname is chosen is arbitrary, so allow either. + assert(displayname == "pytools-ext" or displayname == "pytools-ext2") + + # verify that there are no global tags in the root. + out = StringIO() + gitret = utility_functions.RunCmd("git", "tag -l", workingdir=testcache, outstream=out) + assert(gitret == 0) + assert(out.getvalue().strip() == "") + + def test_omnicache_add_remove(self): + testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") + + # create a new cache + oc = omnicache.Omnicache(testcache, create=True, convert=False) + + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + # add a remote with display name + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions.git", name="pytools-ext") + assert(ret == 0) + + assert(len(omnicache.Omnicache.GetRemotes(testcache).keys()) == 1) + assert("https://github.com/tianocore/edk2-pytool-extensions.git" + in omnicache.Omnicache.GetRemotes(testcache).values()) + + # get the remote UUID + remoteName = oc._LookupRemoteForUrl("https://github.com/tianocore/edk2-pytool-extensions.git") + assert(remoteName is not None) + + # confirm that remote data is as expected + remoteData = oc.GetRemoteData() + assert(len(remoteData.keys()) == 1) + assert(remoteData[remoteName]["url"] == "https://github.com/tianocore/edk2-pytool-extensions.git") + assert(remoteData[remoteName]["displayname"] == "pytools-ext") + + # remove the remote and make sure it is gone + ret = oc.RemoveRemote("https://github.com/tianocore/edk2-pytool-extensions.git") + assert(ret == 0) + assert(len(omnicache.Omnicache.GetRemotes(testcache).keys()) == 0) + assert("https://github.com/tianocore/edk2-pytool-extensions.git" + not in omnicache.Omnicache.GetRemotes(testcache).values()) + + # add a remote without display name + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions.git") + assert(ret == 0) + + assert(len(omnicache.Omnicache.GetRemotes(testcache).keys()) == 1) + assert("https://github.com/tianocore/edk2-pytool-extensions.git" + in omnicache.Omnicache.GetRemotes(testcache).values()) + + # get the remote UUID + remoteName = oc._LookupRemoteForUrl("https://github.com/tianocore/edk2-pytool-extensions.git") + assert(remoteName is not None) + + # confirm that remote data is as expected + remoteData = oc.GetRemoteData() + assert(len(remoteData.keys()) == 1) + assert(remoteData[remoteName]["url"] == "https://github.com/tianocore/edk2-pytool-extensions.git") + assert("displayname" not in remoteData[remoteName]) + + # add a remote that already exists (with new display name) and make sure it is treated as an update. + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions.git", name="pytools-ext2") + assert(ret == 0) + + assert(len(omnicache.Omnicache.GetRemotes(testcache).keys()) == 1) + assert("https://github.com/tianocore/edk2-pytool-extensions.git" + in omnicache.Omnicache.GetRemotes(testcache).values()) + + # get the remote UUID + remoteName = oc._LookupRemoteForUrl("https://github.com/tianocore/edk2-pytool-extensions.git") + assert(remoteName is not None) + + # confirm that remote data is as expected + remoteData = oc.GetRemoteData() + assert(len(remoteData.keys()) == 1) + assert(remoteData[remoteName]["url"] == "https://github.com/tianocore/edk2-pytool-extensions.git") + assert(remoteData[remoteName]["displayname"] == "pytools-ext2") + + # attempt to remove a non-existent remote + ret = oc.RemoveRemote("http://thisisnot.com/good.git") + assert (ret != 0) + + def test_omnicache_update(self): + testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") + + # create a new cache + oc = omnicache.Omnicache(testcache, create=True, convert=False) + + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + # add a remote with display name + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions.git", name="pytools-ext") + assert(ret == 0) + + assert(len(omnicache.Omnicache.GetRemotes(testcache).keys()) == 1) + assert("https://github.com/tianocore/edk2-pytool-extensions.git" + in omnicache.Omnicache.GetRemotes(testcache).values()) + + # get the remote UUID + remoteName = oc._LookupRemoteForUrl("https://github.com/tianocore/edk2-pytool-extensions.git") + assert(remoteName is not None) + + # update the URL and displayname of the remote + ret = oc.UpdateRemote( + "https://github.com/tianocore/edk2-pytool-extensions.git", + newUrl="https://github.com/tianocore/edk2-pytool-extensions2.git", + newName="pytools-ext2") + assert(ret == 0) + + assert(len(omnicache.Omnicache.GetRemotes(testcache).keys()) == 1) + assert("https://github.com/tianocore/edk2-pytool-extensions2.git" + in omnicache.Omnicache.GetRemotes(testcache).values()) + + # make sure UUID didn't change + assert(remoteName == oc._LookupRemoteForUrl("https://github.com/tianocore/edk2-pytool-extensions2.git")) + + # confirm that remote data is as expected + remoteData = oc.GetRemoteData() + assert(len(remoteData.keys()) == 1) + assert(remoteData[remoteName]["url"] == "https://github.com/tianocore/edk2-pytool-extensions2.git") + assert(remoteData[remoteName]["displayname"] == "pytools-ext2") + + # update a non-existent URL in the cache and confirm error is returned + ret = oc.UpdateRemote("https://not.a.real.url.com/git") + assert (ret != 0) + + def test_omnicache_fetch(self): + testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") + + # create a new cache + oc = omnicache.Omnicache(testcache, create=True, convert=False) + + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + # add a remote with display name + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions.git", name="pytools-ext") + assert(ret == 0) + + # fetch the remote + ret = oc.Fetch() + assert (ret == 0) + + # get the remote UUID + remoteName = oc._LookupRemoteForUrl("https://github.com/tianocore/edk2-pytool-extensions.git") + assert(remoteName is not None) + + # verify that branches were fetched into the omnicache + assert(len(os.listdir(os.path.join(testcache, "refs", "remotes", remoteName))) != 0) + + # verify that tags were fetched into the omnicache + assert(len(os.listdir(os.path.join(testcache, "refs", "rtags", remoteName))) != 0) + + def test_omnicache_list(self): + testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") + + # create a new cache + oc = omnicache.Omnicache(testcache, create=True, convert=False) + + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + oc.List() + + # add a remote with display name + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions.git", name="pytools-ext") + assert(ret == 0) + + oc.List() + + def test_config_files(self): + testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") + testyaml = os.path.join(os.path.abspath(os.getcwd()), test_dir, "cfg.yaml") + + # create a new cache + oc = omnicache.Omnicache(testcache, create=True, convert=False) + + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + # add a remote with display name + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions.git", name="pytools-ext") + assert(ret == 0) + + # add a remote with no display name + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions2.git") + assert(ret == 0) + + # export yaml cfg + ret = omnicache.Export(oc, testyaml) + assert(ret == 0) + + # inspect the yaml for correctness + with open(testyaml) as yf: + content = yaml.safe_load(yf) + + assert("remotes" in content) + assert(len(content["remotes"]) == 2) + for remote in content["remotes"]: + if (remote["url"] == "https://github.com/tianocore/edk2-pytool-extensions.git"): + assert (remote["name"] == "pytools-ext") + elif (remote["url"] == "https://github.com/tianocore/edk2-pytool-extensions2.git"): + assert (omnicache.Omnicache._IsValidUuid(remote["name"])) + # remove the "display name" for input test below + del remote["name"] + else: + # not one of the URLs we populated above = bad. + assert(remote["url"] not in remote.values()) + + # save the yaml file (since we removed one of the displaynames) + with open(testyaml, "w") as yf: + yaml.dump(content, yf) + + # remove the remotes + ret = oc.RemoveRemote("https://github.com/tianocore/edk2-pytool-extensions.git") + assert (ret == 0) + ret = oc.RemoveRemote("https://github.com/tianocore/edk2-pytool-extensions2.git") + assert (ret == 0) + + # confirm we have no remotes + assert(len(oc.GetRemoteData()) == 0) + + # import yaml cfg + ret = omnicache.ProcessInputConfig(oc, testyaml) + assert(ret == 0) + + # check resulting omnicache config + for remote in oc.GetRemoteData().values(): + if (remote["url"] == "https://github.com/tianocore/edk2-pytool-extensions.git"): + assert (remote["displayname"] == "pytools-ext") + elif (remote["url"] == "https://github.com/tianocore/edk2-pytool-extensions2.git"): + assert ("displayname" not in remote) else: - omnicache.InitOmnicache(testcache) - - omnicache_config = omnicache.OmniCacheConfig(omnicache_config_file) - os.chdir(testcache) - - (count, input_config_remotes) = omnicache.AddEntriesFromConfig(omnicache_config, testconfig["cfgfile"]) - - assert(count == 1) - assert(input_config_remotes is not None) - assert(input_config_remotes[0]["name"] == testconfig["name"]) - assert(input_config_remotes[0]["url"] == testconfig["url"]) - - omnicache_config.Save() - - # check that cache properly initialized/updated - out = StringIO() - param = "remote -v" - gitret = utility_functions.RunCmd("git", param, outstream=out) - assert(gitret == 0) - - lines = out.getvalue().split('\n') - out.close() - assert (len(lines) > 0) - for line in lines: - line = line.strip() - if(len(line) == 0): - # empty line - continue - git = line.split() - assert(git[0] == input_config_remotes[0]["name"]) - assert(git[1] == input_config_remotes[0]["url"]) - - os.chdir(currentdir) + # not one of the URLs we populated above = bad. + assert(remote["url"] not in remote.values()) + + def test_omnicache_main(self): + testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") + # shameless code coverage play + oldargs = sys.argv + sys.argv = ["omnicache", "--init", testcache] + ret = omnicache.main() + assert(ret == 0) + sys.argv = ["omnicache", "--new", testcache] + ret = omnicache.main() + assert(ret != 0) + sys.argv = ["omnicache", "--scan", testcache, testcache] + ret = omnicache.main() + assert(ret == 0) + sys.argv = oldargs if __name__ == '__main__':