diff --git a/.cspell.json b/.cspell.json index 3c21092c..6be0b4e7 100644 --- a/.cspell.json +++ b/.cspell.json @@ -123,6 +123,7 @@ "dbxupdate", "FileFlagsMask", "markdownlint", - "xffd" + "xffd", + "rtags" ] } diff --git a/edk2toolext/omnicache.py b/edk2toolext/omnicache.py index a059cef5..35f608c1 100644 --- a/edk2toolext/omnicache.py +++ b/edk2toolext/omnicache.py @@ -10,252 +10,366 @@ import argparse import datetime import yaml +import uuid from io import StringIO from edk2toolext import edk2_logging -from edk2toollib import utility_functions -from edk2toolext.edk2_git import Repo - - -class OmniCacheConfig(): - ''' - class to manage the Internal Omnicache config file. - Load, Save, Version check, etc. - ''' - - CONFIG_VERSION = 1 +from edk2toollib.utility_functions import RunCmd + +# Omnicache version 0.11 design notes: +# 1. Only a single remote per URL will be permitted. +# 2. Remote names inside omnicache will be UUIDs generated when a new URL is added. +# 3. Users may specify names for remotes, but they will be treated simply as "display names" without git significance. +# 4. As many objects as possible will be cached for any given remote; so all remotes will fetch tags. +# 4a. To avoid tag name collisions between remotes, tags will be fetched into per-remote namespaces. +# 5. Older omnicaches will be updated to meet the above design points when first encountered by the script. +# 5a. This implementation takes care that the underlying git construction is compatible (but not performant) when an +# older omnicache interacts with it. Older omnicache should not crash when encountering a newer omnicache directory +# but may take a lot of slow and unecessary actions to "re-initialize" it. + +OMNICACHE_VERSION = "0.11" + +# Prior to version 0.11, an "omnicache.yaml" file in the root of the omnicache repo was used to manage the cache +# configuration. Starting with 0.11, git config entries are used directly. If the file below is present, it will be +# removed to force re-initialization of the omnicache by older omnicache instances. (e.g. if an old branch with a prior +# implementation of omnicache is being built and uses the same cache directory) +PRE_0_11_OMNICACHE_FILENAME = "omnicache.yaml" + + +class Omnicache(): + """ Class for managing an omnicache instance""" + + def __init__(self, cachepath, create=False, convert=True): + """Initializes an omnicache + cachepath - path to the omnicache git repo + create - if True, and omnicache doesn't already exist at cachepath, create a new omnicache instance + convert - if True, and old (but compatible) omnicache already exists, at cachepath, convert it to this + version of Omnicache + """ + self.path = cachepath + self._InvalidateUrlLookupCache() + + (valid, isConversionCandidate) = self._ValidateOmnicache() + if (not valid): + if create and not isConversionCandidate: + self._InitOmnicache() + elif (convert and isConversionCandidate): + self._ConvertOmnicache() + else: + logging.critical("Omnicache at {0} not valid, and cannot create/convert.".format(cachepath)) + raise RuntimeError("Invalid cache path: {0}".format(cachepath)) + + def _ValidateOmnicache(self): + """Validates whether self.path has a valid omnicache instance. + Returns: (valid, convertible) where "valid" is True if a compatible Omnicache exists at self.path, and + "convertible" is True if an older Omnicache that can be converted exists at self.path. + """ + logging.info("Checking if {0} is valid omnicache.".format(self.path)) + if (not os.path.isdir(self.path)): + logging.debug("{0} does not exist - not valid (not convertible).".format(self.path)) + return (False, False) + out = StringIO() + ret = RunCmd("git", "rev-parse --is-bare-repository", workingdir=self.path, outstream=out) + if (ret != 0): + logging.debug("{0} error getting repo state - not valid (not convertible).".format(self.path)) + return (False, False) + if (out.getvalue().strip().lower() == "true"): + out = StringIO() + ret = RunCmd("git", "config --local omnicache.metadata.version", workingdir=self.path, outstream=out) + if (ret != 0): + logging.debug("{0} - error retrieving omnicache version. not valid (is convertible).".format(self.path)) + return (False, True) + if (out.getvalue().strip() == OMNICACHE_VERSION): + logging.debug("{0} - matching omnicache version. valid (convertible don't care).".format(self.path)) + return (True, True) + else: + logging.debug("{0} - non-matching omnicache version. not valid (is convertible).".format(self.path)) + return (False, True) + logging.debug("{0} - not a bare repo. not valid (not convertible).".format(self.path)) + return (False, False) + + def _InitOmnicache(self): + """Initializes a new omnicache instance""" + logging.critical("Initializing Omnicache in {0}".format(self.path)) + os.makedirs(self.path, exist_ok=True) + ret = RunCmd("git", "init --bare", workingdir=self.path) + if (ret != 0): + return ret + return RunCmd("git", + "config --local omnicache.metadata.version {0}".format(OMNICACHE_VERSION), + workingdir=self.path) + + def _ConvertOmnicache(self): + """Converts an existing bare git repo from a previous omnicache version to the current omnicache version""" + logging.info("Converting Omnicache in {0} to latest format.".format(self.path)) + if (os.path.exists(os.path.join(self.path, PRE_0_11_OMNICACHE_FILENAME))): + os.remove(os.path.join(self.path, PRE_0_11_OMNICACHE_FILENAME)) + remotes = Omnicache.GetRemotes(self.path) + logging.info("Renaming non-UUID remotes with UUID.") + for (name, _) in remotes.items(): + if (not Omnicache._IsValidUuid(name)): + logging.info("Converting remote {0} to UUID".format(name)) + # Rename the remote with a valid UUID + newName = str(uuid.uuid4()) + ret = RunCmd("git", "remote rename {0} {1}".format(name, newName), workingdir=self.path) + if (ret != 0): + return ret + # Remove previous fetch config entries and regenerate them. Proceed to create new ones even if it fails. + RunCmd("git", "config --local --unset-all remote.{0}.fetch".format(newName), workingdir=self.path) + RunCmd("git", + "config --local --add remote.{0}.fetch +refs/heads/*:refs/remotes/{0}/*".format(newName), + workingdir=self.path) + RunCmd("git", + "config --local --add remote.{0}.fetch refs/tags/*:refs/rtags/{0}/*".format(newName), + workingdir=self.path) + # Add the original name as a display name + RunCmd("git", + "config --local omnicache.{0}.displayname {1}".format(newName, name), + workingdir=self.path) + logging.info("Remote {0} converted to {1}.".format(name, newName)) + # delete any tags in the global name space (older omnicaches fetched all tags into the global tag namespace) + logging.info("Removing global tags") + out = StringIO() + RunCmd("git", "tag -l", workingdir=self.path, outstream=out) + tags = " ".join(out.getvalue().splitlines()) + RunCmd("git", "tag -d {0}".format(tags), workingdir=self.path) + # remove duplicates + logging.info("Removing remotes with duplicate URLs") + knownUrls = [] + remotes = Omnicache.GetRemotes(self.path) + for (name, url) in remotes.items(): + if (url not in knownUrls): + logging.info("Retaining remote {0} with unique URL {1}".format(name, url)) + knownUrls.append(url) + else: + logging.info("Removing remote {0} with duplicate URL {1}".format(name, url)) + RunCmd("git", "remote remove {0}".format(name), workingdir=self.path) + RunCmd("git", "config --local --unset omnicache.{0}.displayname".format(name), workingdir=self.path) + # write current omnicache version into cache + logging.info("Writing Omnicache version") + return RunCmd("git", + "config --local omnicache.metadata.version {0}".format(OMNICACHE_VERSION), + workingdir=self.path) + + def _RefreshUrlLookupCache(self): + """Refreshes the URL lookup cache""" + if (len(self.urlLookupCache) == 0): + logging.info("Regenerating URL lookup cache.") + out = StringIO() + ret = RunCmd("git", r"config --local --get-regexp remote\..*?\.url", workingdir=self.path, outstream=out) + if (ret != 0): + return None + # output is in the form: remote..url + for remote in out.getvalue().splitlines(): + self.urlLookupCache[remote.split()[1]] = ".".join(remote.split()[0].split(".")[1:-1]) + + def _InvalidateUrlLookupCache(self): + """Invalidates the URL lookup cache""" + logging.debug("Invalidating URL lookup cache.") + self.urlLookupCache = {} + + def _LookupRemoteForUrl(self, url): + """Returns the git remote name for the specified URL, or None if it doesn't exist""" + self._RefreshUrlLookupCache() + if (url in self.urlLookupCache): + return self.urlLookupCache[url] + return None - def __init__(self, absfilepath): - self.version = OmniCacheConfig.CONFIG_VERSION - self.filepath = absfilepath - self.last_change = datetime.datetime.strftime(datetime.datetime.now(), "%A, %B %d, %Y %I:%M%p") - if os.path.isfile(self.filepath): - self._Load() - else: - self.remotes = {} - - def _Load(self): - with open(self.filepath) as yml_file: - content = yaml.safe_load(yml_file) - - if "version" not in content: - raise Exception("Unsupported Config Version (None)") - elif content["version"] == self.version: - # parse yml into config data - self.remotes = {x["name"]: x for x in content["remotes"]} - self.last_change = content["last_change"] - else: - self._Transition(content) - - def Save(self): - data = {"version": self.version, "remotes": list(self.remotes.values()), - "last_change": datetime.datetime.strftime(datetime.datetime.now(), - "%A, %B %d, %Y %I:%M%p")} - with open(self.filepath, 'w') as outfile: - yaml.dump(data, outfile, default_flow_style=False) - - def _Transition(self, data): - # Add code here to move old config data to new format - raise Exception("Unsupported config data") - - def Log(self, level=logging.DEBUG): - logging.log(level, "OmniCache Config") - logging.log(level, " Filepath: {0}".format(self.filepath)) - logging.log(level, " Version: {%d}", self.version) - logging.log(level, " Remotes({%d})", len(self.remotes)) - for remote in self.remotes.values(): - rstring = "Name: {0} Url: {1} TagSync: {2}".format(remote["name"], remote["url"], ("tag" in remote)) - logging.log(level, " " + rstring) - - def Add(self, name, url, tags=False): - # check if this already exists - if self.Contains_url(url): - logging.warning("Skipping add this entry %s %s" % (name, url)) - return - # if the name already exists, we overwrite it - remote = {"name": name, "url": url} - if tags: - remote["tag"] = True - self.remotes[name] = remote - - def Contains_url(self, url): - for x in self.remotes.values(): - if x["url"] == url: - return True + def AddRemote(self, url, name=None): + """Adds a remote for the specified URL to the omnicache + url - URL for which to add a remote + name - (optional) - provides a "display name" to be associated with this remote. + """ + # if we already have this remote (i.e. a remote with this URL exists), then just update. + if (self._LookupRemoteForUrl(url) is not None): + return self.UpdateRemote(url, newName=name) + # otherwise create it. + logging.info("Adding new remote for url {0}".format(url)) + newName = str(uuid.uuid4()) + ret = RunCmd("git", "remote add {0} {1}".format(newName, url), workingdir=self.path) + if (ret != 0): + return ret + self._InvalidateUrlLookupCache() + # add display name, if specified + if (name is not None): + ret = RunCmd("git", + "config --local omnicache.{0}.displayname {1}".format(newName, name), + workingdir=self.path) + if (ret != 0): + return ret + # add a special fetch refspec to fetch remote tags into a per-remote local namespace. + return RunCmd("git", + "config --local --add remote.{0}.fetch refs/tags/*:refs/rtags/{0}/*".format(newName), + workingdir=self.path) + + def RemoveRemote(self, url): + """Removes the remote for the specified url from the cache""" + name = self._LookupRemoteForUrl(url) + if (name is None): + logging.critical("Failed to remove node for url {0}: such a remote does not exist.".format(url)) + return 1 + logging.info("Removing remote for url {0}".format(url)) + ret = RunCmd("git", "remote remove {0}".format(name), workingdir=self.path) + self._InvalidateUrlLookupCache() + return ret + + def UpdateRemote(self, oldUrl, newUrl=None, newName=None): + """Updates the remote + oldUrl - current url for the remote + newUrl - (optional) if specified, updates the remote to point to the new URL. + newName - (optional) if specified, updates the "displayname" for the remote. + """ + remote = self._LookupRemoteForUrl(oldUrl) + if (remote is None): + logging.critical("Failed to update node for url {0}: such a remote does not exist.".format(oldUrl)) + return 1 + if (newName is not None): + logging.info("Updating display name for url {0} to {1}".format(oldUrl, newName)) + ret = RunCmd("git", + "config --local omnicache.{0}.displayname {1}".format(remote, newName), + workingdir=self.path) + if (ret != 0): + return ret + if (newUrl is not None): + logging.info("Updating url {0} to {1}".format(oldUrl, newUrl)) + ret = RunCmd("git", "remote set-url {0} {1}".format(remote, newUrl), workingdir=self.path) + self._InvalidateUrlLookupCache() + if (ret != 0): + return ret + + return 0 + + def Fetch(self): + """Fetches all remotes""" + logging.info("Fetching all remotes.") + self._RefreshUrlLookupCache() + # Tricky: we pass no-tags here, since we set up custom fetch refs for tags on a per-remote basis. This prevents + # git from fetching the first set of tags into the global namespace. + return RunCmd("git", "fetch --all -j {0} --no-tags".format(len(self.urlLookupCache)), workingdir=self.path) + + def GetRemoteData(self): + """Gets Remote Data + Returns: a dict of dicts of the form: + {"":{"url":, "displayname":}} + note that "displayname" may not be present if the remote has no display name. + """ + logging.info("Retrieving all remote data") + self._RefreshUrlLookupCache() + remoteData = {} + for url in self.urlLookupCache.keys(): + remoteData[self.urlLookupCache[url]] = {"url": url} + + out = StringIO() + ret = RunCmd("git", + r"config --local --get-regexp omnicache\..*?\.displayname", + workingdir=self.path, + outstream=out) + if (ret != 0): + return remoteData + + for displayName in out.getvalue().splitlines(): + remoteName = displayName.split()[0].split(".")[1] + if (remoteName in remoteData.keys()): + remoteData[remoteName].update({"displayname": displayName.split()[1]}) + return remoteData + + def List(self): + """Prints the current set of remotes""" + print("List OMNICACHE content:\n") + remoteData = self.GetRemoteData() + if (len(remoteData) == 0): + print("No remotes.") + for (name, data) in remoteData.items(): + print("Id {0}: {1}".format(name, str(data))) + + @staticmethod + def GetRemotes(path): + """Gets the remotes for the git repository at the specified path. + Returns: dict of the form {:} + """ + logging.info("Retreiving remotes") + remotes = {} + out = StringIO() + + ret = RunCmd("git", "remote -v", workingdir=path, outstream=out) + if (ret != 0): + return remotes + + # Note: this loop assumes fetch and push URLs will be identical. If not, the last URL output will be the result. + for remote in out.getvalue().splitlines(): + remoteInfo = remote.split() + remotes[remoteInfo[0]] = remoteInfo[1] + + return remotes + + @staticmethod + def _IsValidUuid(val): + """Returns whether the input is valid UUID""" + try: + uuid.UUID(str(val)) + return True + except ValueError: + pass return False - def Contains_name(self, name): - for x in self.remotes.values(): - if x["name"] == name: - return True - return False - def GetNameForUrl(self, url): - for x in self.remotes.values(): - if (x["url"] == url): - return x["name"] - return None - - def Remove(self, del_name): - del self.remotes[del_name] - - def Contains(self, name): - return name in self.remotes +def ProcessInputConfig(omnicache, input_config): + """Adds a list of remotes from a YAML config file to the omnicache""" + logging.info("Adding remotes from {0}".format(input_config)) + with open(input_config) as icf: + content = yaml.safe_load(icf) + if "remotes" in content: + for remote in content["remotes"]: + # dict.get() used here to set name=None if no name specified in input cfg file. + omnicache.AddRemote(remote["url"], name=remote.get("name")) -OMNICACHE_VERSION = "0.10" -OMNICACHE_FILENAME = "omnicache.yaml" + return 0 -def CommonFilePathHandler(path): - ''' - function to check for absolute path and if not - concat with current dir and return absolute real path - ''' - if not os.path.isabs(path): - path = os.path.join(os.getcwd(), path) - path = os.path.realpath(path) - return path +def ScanDirectory(omnicache, scanpath): + """Recursively scans a directory for git repositories and adds remotes and submodule remotes to omnicache""" + logging.info("Scanning {0} for remotes to add.".format(scanpath)) + if (not os.path.isdir(scanpath)): + logging.critical("specified scan path is invalid.") + return -1 + + for (dirpath, dirnames, filenames) in os.walk(scanpath): + if (".git" in dirnames): + newRemotes = Omnicache.GetRemotes(dirpath) + for (name, url) in newRemotes.items(): + omnicache.AddRemote(url, name) + + if (".gitmodules" in filenames): + out = StringIO() + ret = RunCmd("git", "config --file .gitmodules --get-regexp url", workingdir=dirpath, outstream=out) + if (ret == 0): + for submodule in out.getvalue().splitlines(): + url = submodule.split()[1] + name = submodule.split()[0].split(".")[1] + omnicache.AddRemote(url, name) + return 0 -def AddEntriesFromConfig(config, input_config_file): - ''' - Add config entries found in the config file - to the omnicache. Entries already in omnicache - with the same name will be updated. - return - the number of entries added to cache - ''' +def Export(omnicache, exportPath): + """Exports omnicache configuration to YAML""" + logging.info("Exporting omnicache config for {0} to {1}".format(omnicache.path, exportPath)) + content = [] + for (name, data) in omnicache.GetRemoteData().items(): + remoteToWrite = {"url": data["url"]} + if ("displayname" in data): + remoteToWrite["name"] = data["displayname"] + else: + remoteToWrite["name"] = name + content.append(remoteToWrite) - count = 0 - with open(input_config_file) as yml_file: - content = yaml.safe_load(yml_file) - if "remotes" in content: - for remote in content["remotes"]: - currentRemoteName = config.GetNameForUrl(remote["url"]) - if (currentRemoteName is not None): - if (remote["name"] != currentRemoteName): - logging.debug( - "remote with name: {0} already in cache, renaming to {1}" - .format(currentRemoteName, remote["name"]) - ) - RemoveEntry(config, currentRemoteName) # remove here, then fall through to add entry below. - else: - logging.debug("remote with name: {0} already in cache".format(remote["name"])) - continue - if "tag" in remote: - AddEntry(config, remote["name"], remote["url"], bool(remote["tag"])) - else: - AddEntry(config, remote["name"], remote["url"]) - count += 1 - return (count, content["remotes"]) - - -def InitOmnicache(path): - logging.critical("Initialize Omnicache to {0}".format(path)) - os.makedirs(path) - return utility_functions.RunCmd("git", "--bare init", workingdir=path) - - -def AddEntry(config, name, url, tags=False): - logging.info("Adding remote ({0} : {1}) to Omnicache".format(name, url)) - - if config.Contains(name): - logging.info("Updating remote ({0} : {1}) in Omnicache".format(name, url)) - param = "remote set-url {0} {1}".format(name, url) - else: - logging.info("Adding remote ({0} : {1}) to Omnicache".format(name, url)) - param = "remote add {0} {1}".format(name, url) - - if(utility_functions.RunCmd("git", param) == 0): - config.Add(name, url, tags) - else: - logging.error("Failed to add remote for {0}".format(name)) - - -def RemoveEntry(config, name): - logging.info("Removing remote named {0}".format(name)) - param = "remote remove {0}".format(name) - if utility_functions.RunCmd("git", param) == 0: - config.Remove(name) - else: - logging.error("Failed to remove remote for {0}".format(name)) - - -def ConsistencyCheckCacheConfig(config): - ''' - Check the git remote list vs what is in the config file - Add remote to git for anything only in config - Add git remote from git into the config file (tags will be false) - - return - 0: success - non-zero: indicates an error - ''' - - logging.debug("start consistency check between git and omnicache config") - out = StringIO() - param = "remote -v" - gitnames = [] # list of git remote names as found in git repo - gitret = utility_functions.RunCmd("git", param, outstream=out) - - if gitret != 0: - logging.critical("Could not list git remotes") - return gitret - - lines = out.getvalue().split('\n') - out.close() - for line in lines: - line = line.strip() - if len(line) == 0: - # empty line - continue - git = line.split() - gitnames.append(git[0]) # save for later - if(not config.Contains(git[0])): - logging.warning("Found entry in git not in config. Name: {0} Url: {1}".format(git[0], git[1])) - config.Add(git[0], git[1]) - config.Save() - - gitnames = set(gitnames) - for remote in config.remotes.values(): - if(remote["name"] not in gitnames): - logging.warning("Found entry in config not in git. Name: {0} Url: {1}".format(remote["name"], - remote["url"])) - param = "remote add {0} {1}".format(remote["name"], remote["url"]) - utility_functions.RunCmd("git", param) + with open(exportPath, "w") as ocf: + yaml.dump({"remotes": content}, ocf) return 0 -def MultiFetch(remotes, tags=False, retries=3): - ''' - do git operations to fetch the specified entries. Supports fetching in parallel for speed, and supports retries to - add robustness to transient fetch failures. - - return - 0: success - non-zero: git command line error unresolved by retry. - ''' - param = "fetch" - param += " -j {0}".format(len(remotes)) - if (tags): - # might want to look at something more complex to avoid tag conflicts - # https://stackoverflow.com/questions/22108391/git-checkout-a-remote-tag-when-two-remotes-have-the-same-tag-name - # param += "+refs/heads/:refs/remotes/{0}/ +refs/tags/:refs/rtags/{0}/".format(name) - param += " --tags" - else: - param += " --no-tags" - param += " --multiple " + " ".join(remotes) - - for _ in range(retries): - ret = utility_functions.RunCmd("git", param) - if ret == 0: - break - return ret - - def get_cli_options(): parser = argparse.ArgumentParser(description='Tool to provide easy method create and manage the OMNICACHE', ) parser.add_argument(dest="cache_dir", help="path to an existing or desired OMNICACHE directory") @@ -267,17 +381,19 @@ def get_cli_options(): action="store_true", default=False) parser.add_argument("-l", "--list", dest="list", default=False, action="store_true", help="List config of OMNICACHE") - parser.add_argument("-a", "--add", dest="add", nargs='*', action="append", - help="Add config entry to OMNICACHE ", + parser.add_argument("-a", "--add", dest="add", nargs=2, action="append", + help="Add config entry to OMNICACHE ", default=[]) parser.add_argument("-c", "--configfile", dest="input_config_file", default=None, help="Add new entries from config file to OMNICACHE") + parser.add_argument("-e", "--exportConfig", dest="output_config_file", default=None, + help="Export current omnicache config as a yaml file") group = parser.add_mutually_exclusive_group() group.add_argument("-u", "--update", "--fetch", dest="fetch", action="store_true", help="Update the Omnicache. All cache changes also cause a fetch", default=False) group.add_argument("--no-fetch", dest="no_fetch", action="store_true", help="Prevent auto-fetch if implied by other arguments.", default=False) - parser.add_argument("-r", "--remove", dest="remove", nargs="?", action="append", + parser.add_argument("-r", "--remove", dest="remove", nargs=1, action="append", help="remove config entry from OMNICACHE ", default=[]) parser.add_argument('--version', action='version', version='%(prog)s ' + OMNICACHE_VERSION) parser.add_argument("--debug", dest="debug", help="Output all debug messages to console", @@ -293,11 +409,6 @@ def main(): console = edk2_logging.setup_console_logging(False) logger.addHandler(console) - ErrorCode = 0 - auto_fetch = False - input_config_remotes = None - - # arg parse args = get_cli_options() if args.debug: @@ -306,141 +417,82 @@ def main(): logging.info("Log Started: " + datetime.datetime.strftime( datetime.datetime.now(), "%A, %B %d, %Y %I:%M%p")) - args.cache_dir = CommonFilePathHandler(args.cache_dir) + args.cache_dir = os.path.realpath(os.path.abspath(args.cache_dir)) logging.debug("OMNICACHE dir: {0}".format(args.cache_dir)) - # input config file for adding new entries if args.input_config_file is not None: - args.input_config_file = CommonFilePathHandler(args.input_config_file) + args.input_config_file = os.path.realpath(os.path.abspath(args.input_config_file)) if not os.path.isfile(args.input_config_file): logging.critical("Invalid -c argument given. File ({0}) isn't valid".format(args.input_config_file)) return -4 logging.debug("Args: " + str(args)) - omnicache_config = None # config object - omnicache_config_file = os.path.join(args.cache_dir, OMNICACHE_FILENAME) - + omnicache = None + auto_fetch = False + # new: initialize omnicache and error if it already exists if args.new: if os.path.isdir(args.cache_dir): logging.critical("--new argument given but OMNICACHE path already exists!") return -1 - InitOmnicache(args.cache_dir) + omnicache = Omnicache(args.cache_dir, create=True) auto_fetch = True - if args.init: - if os.path.isdir(args.cache_dir): - if os.path.isfile(omnicache_config_file): - logging.debug("OMNICACHE already exists. No need to initialize") - else: - InitOmnicache(args.cache_dir) + # init: initialize omnicache if not already initialized. + if args.init and omnicache is None: + omnicache = Omnicache(args.cache_dir, create=True, convert=True) auto_fetch = True - # Check to see if exists - if not os.path.isdir(args.cache_dir): - logging.critical("OMNICACHE path invalid.") - return -2 - - # load config - omnicache_config = OmniCacheConfig(omnicache_config_file) + # Other args require omnicache, so check that it exists. + if omnicache is None: + omnicache = Omnicache(args.cache_dir) - os.chdir(args.cache_dir) + # add: add new source(s) to omnicache from command line arg. + if (len(args.add) > 0): + for (name, url) in args.add: + ret = omnicache.AddRemote(url, name) + if (ret != 0): + return -3 + auto_fetch = True - if(len(args.add) > 0): + # config: add or update sources(s) from config file. + if (args.input_config_file is not None): + ret = ProcessInputConfig(omnicache, args.input_config_file) + if (ret != 0): + return -4 auto_fetch = True - for inputdata in args.add: - if len(inputdata) == 2: - AddEntry(omnicache_config, inputdata[0], inputdata[1]) - elif len(inputdata) == 3: - AddEntry(omnicache_config, inputdata[0], inputdata[1], bool(inputdata[2])) - else: - logging.critical("Invalid Add Entry. Should be ") - return -3 - if(args.input_config_file is not None): - (count, input_config_remotes) = AddEntriesFromConfig(omnicache_config, args.input_config_file) - if(count > 0): - auto_fetch = True + # remove: remove source(s) from omnicache as specified by command line arg. + if (len(args.remove) > 0): + for url in args.remove: + ret = omnicache.RemoveRemote(url) + if (ret != 0): + return -4 + + # scan: recursively scan the given directory and add all repos and submodules + if (args.scan is not None): + ret = ScanDirectory(omnicache, args.scan) + if (ret != 0): + return -5 + auto_fetch = True - if len(args.remove) > 0: - for inputdata in args.remove: - RemoveEntry(omnicache_config, inputdata) + # fetch: update the omnicache with objects from its remotes. + # Note: errors are ignored here, since transient network failures may occur that prevent cache update. Those just + # mean the omnicache may be a a little stale which should not be fatal to users of the cache. + if(args.fetch or (auto_fetch and not args.no_fetch)): + omnicache.Fetch() - # if we need to scan - if args.scan is not None: - logging.critical("OMNICACHE is scanning the folder %s.") - if not os.path.isdir(args.scan): - logging.error("Invalid scan directory") - return -4 - reposFound = dict() - # iterate through top level directories - dirs = os.listdir(args.scan) - while len(dirs) > 0: - item = dirs.pop() - itemDir = os.path.join(args.scan, item) - if os.path.isfile(itemDir): - continue - logging.info("Scanning %s for a git repo" % item) - gitDir = os.path.join(itemDir, ".git") - # Check if it's a directory or a file (submodules usually have a file instead of a folder) - if os.path.isdir(gitDir) or os.path.isfile(gitDir): - repo = Repo(itemDir) - if repo.url: - if repo.url not in reposFound: - reposFound[repo.url] = item - else: - logging.warning("Skipping previously found repo at %s with url %s" % (item, repo.url)) - else: # if repo.url is none - logging.error("Url not found for git repo at: %s" % itemDir) - # check for submodules - if repo.submodules: - for submodule in repo.submodules: - dirs.append(os.path.join(item, submodule)) - else: - logging.error("Git repo not found at %s" % itemDir) - # go through all the URLs found - for url in reposFound: - omnicache_config.Add(reposFound[url], url) + # list: print out the omnicache contents. + if (args.list): + omnicache.List() - omnicache_config.Save() + # export: + if (args.output_config_file): + ret = Export(omnicache, args.output_config_file) + if (ret != 0): + return -6 - if(args.fetch or (auto_fetch and not args.no_fetch)): - logging.critical("Updating OMNICACHE") - # as an optimization, if input config file provided, only fetch remotes specified in input config - # otherwise, fetch all remotes in the OmniCache - if (input_config_remotes is not None): - remotes = [x["name"] for x in input_config_remotes] - else: - remotes = [omnicache_config.remotes.keys()] - - # Fetch no-tags and tags separately. - no_tags = [omnicache_config.remotes[x]["name"] for x in remotes if "tag" not in omnicache_config.remotes[x]] - with_tags = [omnicache_config.remotes[x]["name"] for x in remotes if "tag" in omnicache_config.remotes[x]] - if (len(no_tags) > 0): - ret = MultiFetch(no_tags, tags=False) - if (ret != 0) and (ErrorCode == 0): - ErrorCode = ret - if (len(with_tags) > 0): - ret = MultiFetch(with_tags, tags=True) - if (ret != 0) and (ErrorCode == 0): - ErrorCode = ret - - if args.list: - ret = ConsistencyCheckCacheConfig(omnicache_config) - if (ret != 0) and (ErrorCode == 0): - ErrorCode = ret - print("List OMNICACHE content\n") - if len(omnicache_config.remotes) == 0: - logging.warning("No Remotes to show") - - for remote in omnicache_config.remotes.values(): - rstring = "Name: {0}\n Url: {1}\n Sync Tags: {2}".format(remote["name"], remote["url"], ("tag" in remote)) - print(" " + rstring + "\n\n") - - print("To use your OMNICACHE set the env variable:") - print("set OMNICACHE_PATH=" + args.cache_dir) - - return ErrorCode + return 0 if __name__ == '__main__': diff --git a/edk2toolext/tests/test_omnicache.py b/edk2toolext/tests/test_omnicache.py index 3175a574..7301f275 100644 --- a/edk2toolext/tests/test_omnicache.py +++ b/edk2toolext/tests/test_omnicache.py @@ -11,6 +11,8 @@ import logging import tempfile import shutil +import yaml +import sys from io import StringIO from edk2toolext import omnicache from edk2toollib import utility_functions @@ -27,7 +29,13 @@ def prep_workspace(): test_dir = tempfile.mkdtemp() logging.debug("temp dir is: %s" % test_dir) else: - shutil.rmtree(test_dir) + for _ in range(3): + try: + shutil.rmtree(test_dir) + except OSError as err: + logging.warning(f"Failed to fully remove {test_dir}: {err}") + else: + break test_dir = tempfile.mkdtemp() current_dir = os.path.abspath(os.getcwd()) @@ -39,7 +47,13 @@ def clean_workspace(): return if os.path.isdir(test_dir): - shutil.rmtree(test_dir) + for _ in range(3): + try: + shutil.rmtree(test_dir) + except OSError as err: + logging.warning(f"Failed to fully remove {test_dir}: {err}") + else: + break test_dir = None @@ -57,84 +71,416 @@ def setUpClass(cls): def tearDownClass(cls): clean_workspace() - def test_basic_init(self): - valueabs = os.path.join(os.path.abspath(os.getcwd()), "test", "test2") - result = omnicache.CommonFilePathHandler(valueabs) - assert(result == valueabs) + def test_omnicache_new(self): + testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") + + # create a new cache + oc = omnicache.Omnicache(testcache, create=True, convert=False) + + # check that the new cache was created as a bare repo + out = StringIO() + gitret = utility_functions.RunCmd("git", "rev-parse --is-bare-repository", workingdir=testcache, outstream=out) + assert(gitret == 0) + assert(out.getvalue().strip().lower() == "true") + + # check that it has the right metadata + out = StringIO() + gitret = utility_functions.RunCmd("git", + "config --local omnicache.metadata.version", + workingdir=testcache, + outstream=out) + assert(gitret == 0) + assert(out.getvalue().strip().lower() == omnicache.OMNICACHE_VERSION) + + # check that omnicache thinks it's valid + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + # remove the metadata to simulate an older omnicache that is candidate for conversion + gitret = utility_functions.RunCmd("git", + "config --local --unset omnicache.metadata.version", + workingdir=testcache) + assert(gitret == 0) + + # check that omnicache thinks it's not valid but convertible. + (valid, convertible) = oc._ValidateOmnicache() + assert(not valid) + assert(convertible) + + # attempt to create a new cache - test repo is valid git repo, but doesn't have the meta. + with self.assertRaises(RuntimeError): + oc = omnicache.Omnicache(testcache, create=False, convert=False) + + # set the metadata version to an unexpected value + gitret = utility_functions.RunCmd( + "git", + "config --local omnicache.metadata.version {0}".format(omnicache.OMNICACHE_VERSION + "x"), + workingdir=testcache) + assert(gitret == 0) + + # check that omnicache thinks it's not valid but convertible. + (valid, convertible) = oc._ValidateOmnicache() + assert(not valid) + assert(convertible) + + # attempt to create a new cache - test repo is valid git repo, but doesn't have the expected meta. + with self.assertRaises(RuntimeError): + oc = omnicache.Omnicache(testcache, create=False, convert=False) - def test_commonfilepathhandler_real(self): - valueabs = os.path.join(os.path.abspath(os.getcwd()), "test", "test2") - result = omnicache.CommonFilePathHandler(os.path.join(valueabs, "..", "test2")) - assert(result == valueabs) + # now make it a non-bare (but technically valid) git repo + gitret = utility_functions.RunCmd("git", "config --local core.bare false", workingdir=testcache) + assert(gitret == 0) - def test_commonfilepathhandler_relative(self): - valueabs = os.path.join(os.path.abspath(os.getcwd()), "test", "test2") - result = omnicache.CommonFilePathHandler(os.path.join("test", "test2")) - assert(result == valueabs) + # check that omnicache thinks it's not valid and not convertible. + (valid, convertible) = oc._ValidateOmnicache() + assert(not valid) + assert(not convertible) - def test_omnicache_init(self): + # attempt to create a new cache - test repo is valid non-bare git repo + with self.assertRaises(RuntimeError): + oc = omnicache.Omnicache(testcache, create=False, convert=False) + + # make it it a non-valid git repo + os.remove(os.path.join(testcache, "HEAD")) + + # check that omnicache thinks it's not valid and not convertible. + (valid, convertible) = oc._ValidateOmnicache() + assert(not valid) + assert(not convertible) + + # attempt to create a new cache - test repo is not valid git repo + with self.assertRaises(RuntimeError): + oc = omnicache.Omnicache(testcache, create=False, convert=False) + + def test_omnicache_convert(self): testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") - testconfigs = [ - { - "cfgfile": os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcfg.yaml"), - "name": "openssl", - "url": "https://github.com/openssl/openssl.git", - "tag": "true" - }, - { - "cfgfile": os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcfg2.yaml"), - "name": "openssl", - "url": "https://foobar.com/openssl/openssl.git", - "tag": "true" - } - ] - - for testconfig in testconfigs: - currentdir = os.path.abspath(os.getcwd()) - with open(testconfig["cfgfile"], "w") as configyaml: - configyaml.write("remotes:\n") - configyaml.write("- name: {0}\n".format(testconfig["name"])) - configyaml.write(" url: {0}\n".format(testconfig["url"])) - configyaml.write(" tag: {0}\n".format(testconfig["tag"])) - - omnicache_config_file = os.path.join(testcache, omnicache.OMNICACHE_FILENAME) - if(os.path.isdir(testcache)): - if(os.path.isfile(omnicache_config_file)): - logging.debug("OMNICACHE already exists. No need to initialize") + + # create a new cache + oc = omnicache.Omnicache(testcache, create=True, convert=False) + + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + # remove the metadata to simulate an older omnicache that is candidate for conversion + gitret = utility_functions.RunCmd("git", + "config --local --unset omnicache.metadata.version", + workingdir=testcache) + assert(gitret == 0) + + # add an empty file to simulate the old config yaml + with open(os.path.join(testcache, omnicache.PRE_0_11_OMNICACHE_FILENAME), "w") as yf: + yf.write("Not A Real YAML File") + + # confirm that _ValidateOmnicache correctly identifies cache state + (valid, convertible) = oc._ValidateOmnicache() + assert(not valid) + assert(convertible) + + # add a traditionally-named remote to the cache to simulate an older omnicache + gitret = utility_functions.RunCmd( + "git", + "remote add pytools-ext https://github.com/tianocore/edk2-pytool-extensions.git", + workingdir=testcache) + assert(gitret == 0) + + # add a second copy of the remote remote with a different name to the cache to simulate a duplicate + gitret = utility_functions.RunCmd( + "git", + "remote add pytools-ext2 https://github.com/tianocore/edk2-pytool-extensions.git", + workingdir=testcache) + assert(gitret == 0) + + # fetch the remote and its tags to populate the cache for conversion. + gitret = utility_functions.RunCmd("git", "fetch pytools-ext --tags", workingdir=testcache) + assert(gitret == 0) + + # confirm that _ValidateOmnicache still correctly identifies cache state + (valid, convertible) = oc._ValidateOmnicache() + assert(not valid) + assert(convertible) + + # re-create the omnicache object to trigger conversion + oc = omnicache.Omnicache(testcache, create=False, convert=True) + + # validate converted cache + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + # verify that old config file was deleted. + assert(not os.path.exists(os.path.join(testcache, omnicache.PRE_0_11_OMNICACHE_FILENAME))) + + # verify that the traditionally-named remote is no longer in the cache (it should have been renamed with a UUID) + remotes = omnicache.Omnicache.GetRemotes(testcache) + assert("pytools-ext" not in remotes.keys()) + + # verify that the URL still is in the cache + assert("https://github.com/tianocore/edk2-pytool-extensions.git" in remotes.values()) + + # verify that there is exactly one entry in the cache for this URL (duplicates should be removed) + assert(sum(url == "https://github.com/tianocore/edk2-pytool-extensions.git" for url in remotes.values()) == 1) + + # verify that the former remote name is now the "omnicache display name" + for (name, url) in remotes.items(): + if (url == "https://github.com/tianocore/edk2-pytool-extensions.git"): + out = StringIO() + gitret = utility_functions.RunCmd("git", + f"config --local omnicache.{name}.displayname", + workingdir=testcache, + outstream=out) + assert(gitret == 0) + displayname = out.getvalue().strip() + # if there are duplicates, which displayname is chosen is arbitrary, so allow either. + assert(displayname == "pytools-ext" or displayname == "pytools-ext2") + + # verify that there are no global tags in the root. + out = StringIO() + gitret = utility_functions.RunCmd("git", "tag -l", workingdir=testcache, outstream=out) + assert(gitret == 0) + assert(out.getvalue().strip() == "") + + def test_omnicache_add_remove(self): + testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") + + # create a new cache + oc = omnicache.Omnicache(testcache, create=True, convert=False) + + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + # add a remote with display name + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions.git", name="pytools-ext") + assert(ret == 0) + + assert(len(omnicache.Omnicache.GetRemotes(testcache).keys()) == 1) + assert("https://github.com/tianocore/edk2-pytool-extensions.git" + in omnicache.Omnicache.GetRemotes(testcache).values()) + + # get the remote UUID + remoteName = oc._LookupRemoteForUrl("https://github.com/tianocore/edk2-pytool-extensions.git") + assert(remoteName is not None) + + # confirm that remote data is as expected + remoteData = oc.GetRemoteData() + assert(len(remoteData.keys()) == 1) + assert(remoteData[remoteName]["url"] == "https://github.com/tianocore/edk2-pytool-extensions.git") + assert(remoteData[remoteName]["displayname"] == "pytools-ext") + + # remove the remote and make sure it is gone + ret = oc.RemoveRemote("https://github.com/tianocore/edk2-pytool-extensions.git") + assert(ret == 0) + assert(len(omnicache.Omnicache.GetRemotes(testcache).keys()) == 0) + assert("https://github.com/tianocore/edk2-pytool-extensions.git" + not in omnicache.Omnicache.GetRemotes(testcache).values()) + + # add a remote without display name + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions.git") + assert(ret == 0) + + assert(len(omnicache.Omnicache.GetRemotes(testcache).keys()) == 1) + assert("https://github.com/tianocore/edk2-pytool-extensions.git" + in omnicache.Omnicache.GetRemotes(testcache).values()) + + # get the remote UUID + remoteName = oc._LookupRemoteForUrl("https://github.com/tianocore/edk2-pytool-extensions.git") + assert(remoteName is not None) + + # confirm that remote data is as expected + remoteData = oc.GetRemoteData() + assert(len(remoteData.keys()) == 1) + assert(remoteData[remoteName]["url"] == "https://github.com/tianocore/edk2-pytool-extensions.git") + assert("displayname" not in remoteData[remoteName]) + + # add a remote that already exists (with new display name) and make sure it is treated as an update. + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions.git", name="pytools-ext2") + assert(ret == 0) + + assert(len(omnicache.Omnicache.GetRemotes(testcache).keys()) == 1) + assert("https://github.com/tianocore/edk2-pytool-extensions.git" + in omnicache.Omnicache.GetRemotes(testcache).values()) + + # get the remote UUID + remoteName = oc._LookupRemoteForUrl("https://github.com/tianocore/edk2-pytool-extensions.git") + assert(remoteName is not None) + + # confirm that remote data is as expected + remoteData = oc.GetRemoteData() + assert(len(remoteData.keys()) == 1) + assert(remoteData[remoteName]["url"] == "https://github.com/tianocore/edk2-pytool-extensions.git") + assert(remoteData[remoteName]["displayname"] == "pytools-ext2") + + # attempt to remove a non-existent remote + ret = oc.RemoveRemote("http://thisisnot.com/good.git") + assert (ret != 0) + + def test_omnicache_update(self): + testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") + + # create a new cache + oc = omnicache.Omnicache(testcache, create=True, convert=False) + + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + # add a remote with display name + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions.git", name="pytools-ext") + assert(ret == 0) + + assert(len(omnicache.Omnicache.GetRemotes(testcache).keys()) == 1) + assert("https://github.com/tianocore/edk2-pytool-extensions.git" + in omnicache.Omnicache.GetRemotes(testcache).values()) + + # get the remote UUID + remoteName = oc._LookupRemoteForUrl("https://github.com/tianocore/edk2-pytool-extensions.git") + assert(remoteName is not None) + + # update the URL and displayname of the remote + ret = oc.UpdateRemote( + "https://github.com/tianocore/edk2-pytool-extensions.git", + newUrl="https://github.com/tianocore/edk2-pytool-extensions2.git", + newName="pytools-ext2") + assert(ret == 0) + + assert(len(omnicache.Omnicache.GetRemotes(testcache).keys()) == 1) + assert("https://github.com/tianocore/edk2-pytool-extensions2.git" + in omnicache.Omnicache.GetRemotes(testcache).values()) + + # make sure UUID didn't change + assert(remoteName == oc._LookupRemoteForUrl("https://github.com/tianocore/edk2-pytool-extensions2.git")) + + # confirm that remote data is as expected + remoteData = oc.GetRemoteData() + assert(len(remoteData.keys()) == 1) + assert(remoteData[remoteName]["url"] == "https://github.com/tianocore/edk2-pytool-extensions2.git") + assert(remoteData[remoteName]["displayname"] == "pytools-ext2") + + # update a non-existent URL in the cache and confirm error is returned + ret = oc.UpdateRemote("https://not.a.real.url.com/git") + assert (ret != 0) + + def test_omnicache_fetch(self): + testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") + + # create a new cache + oc = omnicache.Omnicache(testcache, create=True, convert=False) + + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + # add a remote with display name + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions.git", name="pytools-ext") + assert(ret == 0) + + # fetch the remote + ret = oc.Fetch() + assert (ret == 0) + + # get the remote UUID + remoteName = oc._LookupRemoteForUrl("https://github.com/tianocore/edk2-pytool-extensions.git") + assert(remoteName is not None) + + # verify that branches were fetched into the omnicache + assert(len(os.listdir(os.path.join(testcache, "refs", "remotes", remoteName))) != 0) + + # verify that tags were fetched into the omnicache + assert(len(os.listdir(os.path.join(testcache, "refs", "rtags", remoteName))) != 0) + + def test_omnicache_list(self): + testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") + + # create a new cache + oc = omnicache.Omnicache(testcache, create=True, convert=False) + + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + oc.List() + + # add a remote with display name + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions.git", name="pytools-ext") + assert(ret == 0) + + oc.List() + + def test_config_files(self): + testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") + testyaml = os.path.join(os.path.abspath(os.getcwd()), test_dir, "cfg.yaml") + + # create a new cache + oc = omnicache.Omnicache(testcache, create=True, convert=False) + + (valid, _) = oc._ValidateOmnicache() + assert(valid) + + # add a remote with display name + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions.git", name="pytools-ext") + assert(ret == 0) + + # add a remote with no display name + ret = oc.AddRemote("https://github.com/tianocore/edk2-pytool-extensions2.git") + assert(ret == 0) + + # export yaml cfg + ret = omnicache.Export(oc, testyaml) + assert(ret == 0) + + # inspect the yaml for correctness + with open(testyaml) as yf: + content = yaml.safe_load(yf) + + assert("remotes" in content) + assert(len(content["remotes"]) == 2) + for remote in content["remotes"]: + if (remote["url"] == "https://github.com/tianocore/edk2-pytool-extensions.git"): + assert (remote["name"] == "pytools-ext") + elif (remote["url"] == "https://github.com/tianocore/edk2-pytool-extensions2.git"): + assert (omnicache.Omnicache._IsValidUuid(remote["name"])) + # remove the "display name" for input test below + del remote["name"] + else: + # not one of the URLs we populated above = bad. + assert(remote["url"] not in remote.values()) + + # save the yaml file (since we removed one of the displaynames) + with open(testyaml, "w") as yf: + yaml.dump(content, yf) + + # remove the remotes + ret = oc.RemoveRemote("https://github.com/tianocore/edk2-pytool-extensions.git") + assert (ret == 0) + ret = oc.RemoveRemote("https://github.com/tianocore/edk2-pytool-extensions2.git") + assert (ret == 0) + + # confirm we have no remotes + assert(len(oc.GetRemoteData()) == 0) + + # import yaml cfg + ret = omnicache.ProcessInputConfig(oc, testyaml) + assert(ret == 0) + + # check resulting omnicache config + for remote in oc.GetRemoteData().values(): + if (remote["url"] == "https://github.com/tianocore/edk2-pytool-extensions.git"): + assert (remote["displayname"] == "pytools-ext") + elif (remote["url"] == "https://github.com/tianocore/edk2-pytool-extensions2.git"): + assert ("displayname" not in remote) else: - omnicache.InitOmnicache(testcache) - - omnicache_config = omnicache.OmniCacheConfig(omnicache_config_file) - os.chdir(testcache) - - (count, input_config_remotes) = omnicache.AddEntriesFromConfig(omnicache_config, testconfig["cfgfile"]) - - assert(count == 1) - assert(input_config_remotes is not None) - assert(input_config_remotes[0]["name"] == testconfig["name"]) - assert(input_config_remotes[0]["url"] == testconfig["url"]) - - omnicache_config.Save() - - # check that cache properly initialized/updated - out = StringIO() - param = "remote -v" - gitret = utility_functions.RunCmd("git", param, outstream=out) - assert(gitret == 0) - - lines = out.getvalue().split('\n') - out.close() - assert (len(lines) > 0) - for line in lines: - line = line.strip() - if(len(line) == 0): - # empty line - continue - git = line.split() - assert(git[0] == input_config_remotes[0]["name"]) - assert(git[1] == input_config_remotes[0]["url"]) - - os.chdir(currentdir) + # not one of the URLs we populated above = bad. + assert(remote["url"] not in remote.values()) + + def test_omnicache_main(self): + testcache = os.path.join(os.path.abspath(os.getcwd()), test_dir, "testcache") + # shameless code coverage play + oldargs = sys.argv + sys.argv = ["omnicache", "--init", testcache] + ret = omnicache.main() + assert(ret == 0) + sys.argv = ["omnicache", "--new", testcache] + ret = omnicache.main() + assert(ret != 0) + sys.argv = ["omnicache", "--scan", testcache, testcache] + ret = omnicache.main() + assert(ret == 0) + sys.argv = oldargs if __name__ == '__main__':