From 41d641b555812cc98c3fa47353f99cd47068e5c9 Mon Sep 17 00:00:00 2001 From: benoit74 Date: Tue, 14 May 2024 07:12:05 +0000 Subject: [PATCH 1/2] Add watcher to dev stack --- dev/README.md | 16 +++++++++++++++- dev/docker-compose.yml | 13 +++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/dev/README.md b/dev/README.md index e7c1f8f8..ec3cf4f1 100644 --- a/dev/README.md +++ b/dev/README.md @@ -41,6 +41,10 @@ This container is the main worker container, responsible to start tasks. It is c This container is a sample task executor. It is commented by default. +### watcher + +This container is a StackExchange dumps watcher. + ## Instructions First start the Docker-Compose stack: @@ -169,4 +173,14 @@ docker exec -it zf_receiver /contrib/create-warehouse-paths.sh You can start a task manager manually simply by requesting a task in the UI and starting it manually (see above). -Once the task is reserved for the `test_worker`, you can modify the `task_worker` container `command` in `docker-compose.yml` with this ID, uncomment the `task_worker` section and start it. \ No newline at end of file +Once the task is reserved for the `test_worker`, you can modify the `task_worker` container `command` in `docker-compose.yml` with this ID, uncomment the `task_worker` section and start it. + +### start a StackExchange dumps watcher + +Uncomment the watcher configuration in docker-compose.yml + +Setup a proper S3 url to a test bucket (including credentials in the URL) + +By default, only beer.stackexchange.com domain dump is considered + +Task are scheduled as in prod when a new dump is downloaded diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index d7b4f2b5..9df592d2 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -95,6 +95,19 @@ services: depends_on: - backend + # # uncomment this only if you want to run a StackExchange dumps watcher + # watcher: + # build: ../watcher + # container_name: zf_watcher + # command: watcher --only beer.stackexchange.com --debug + # environment: + # - ZIMFARM_API_URL=http://backend:8000/v1 + # - ZIMFARM_USERNAME=admin + # - ZIMFARM_PASSWORD=admin + # - S3_URL=https://s3.us-west-1.wasabisys.com/?keyId=&secretAccessKey=&bucketName=org-kiwix-dev-stackexchange + # depends_on: + # - backend + # # uncomment this only if you want to run a worker manager # worker_mgr: # build: From d780afbaee43d783a3b658d240e8cbee75a77a91 Mon Sep 17 00:00:00 2001 From: benoit74 Date: Tue, 14 May 2024 06:43:53 +0000 Subject: [PATCH 2/2] StackExchange watcher support of intra-month dump updates --- watcher/watcher.py | 42 ++++++++++++++++++++---------------------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/watcher/watcher.py b/watcher/watcher.py index a0085539..e5d7bc47 100644 --- a/watcher/watcher.py +++ b/watcher/watcher.py @@ -63,14 +63,12 @@ logging.getLogger(logger_name).setLevel(logging.WARNING) -def get_version_for(url): - """casted datetime of the Last-Modified header for an URL""" +def get_last_modified_for(url): + """the Last-Modified header for an URL""" with requests.head(url, allow_redirects=True) as resp: if resp.status_code == 404: raise FileNotFoundError(url) - return datetime.datetime.strptime( - resp.headers.get("last-modified"), "%a, %d %b %Y %H:%M:%S GMT" - ).strftime("%Y-%m") + return resp.headers.get("last-modified") def get_token(api_url, username, password): @@ -243,17 +241,16 @@ def zimfarm_credentials_ok(self): return success def retrieve_all_sites(self): - """version, list of domain names for which there's a dump online""" + """list of domain names for which there's a dump online""" url = f"{DOWNLOAD_URL}/Sites.xml" resp = requests.get(url) parser = XMLtoDict() sites = parser.parse(resp.text).get("sites", {}).get("row", []) - version = get_version_for(url) def _filter(item): return item in self.only if self.only else 1 - return version, filter( + return filter( _filter, [re.sub(r"^https?://", "", site.get("@Url", "")) for site in sites] ) @@ -343,7 +340,7 @@ def schedule_in_zimfarm(self, domain): return payload.get("requested") - def update_file(self, key, schedule_upon_success): + def update_file(self, key, schedule_upon_success, last_modified): """Do an all-steps update of that file as we know there's a new one avail.""" domain = re.sub(r".7z$", "", key) prefix = f" [{domain}]" @@ -357,18 +354,13 @@ def update_file(self, key, schedule_upon_success): if self.s3_storage.has_object(key): logger.info(f"{prefix} Removing object in S3") - obsolete = self.s3_storage.get_object_stat(key).meta.get("version") + obsolete = self.s3_storage.get_object_stat(key).meta.get("lastmodified") self.s3_storage.delete_object(key) - logger.info(f"{prefix} Removed object (was version {obsolete})") + logger.info(f"{prefix} Removed object (was from {obsolete})") logger.info(f"{prefix} Downloading…") url = f"{DOWNLOAD_URL}/{key}" fpath = self.work_dir / key - try: - version = get_version_for(url) - except FileNotFoundError: - logger.error(f"{url} is missing upstream. Skipping.") - return wget = subprocess.run( ["/usr/bin/env", "wget", "-O", fpath, url], @@ -383,7 +375,9 @@ def update_file(self, key, schedule_upon_success): logger.info(f"{prefix} Download completed") logger.info(f"{prefix} Uploading to S3…") - self.s3_storage.upload_file(fpath=fpath, key=key, meta={"version": version}) + self.s3_storage.upload_file( + fpath=fpath, key=key, meta={"lastmodified": last_modified} + ) logger.info(f"{prefix} Uploaded") fpath.unlink() @@ -400,9 +394,9 @@ def update_file(self, key, schedule_upon_success): logger.warning(f"{prefix} couldn't schedule recipe(s)") def check_and_go(self): - logger.info("Checking StackExchange version…") - version, domains = self.retrieve_all_sites() - logger.info(f"Latest online version: {version}. Comparing with S3…") + logger.info("Getting list of SE domains") + domains = self.retrieve_all_sites() + logger.info("Grabbing online versions and comparing with S3…") self.domains_futures = {} # future: key self.domains_executor = cf.ThreadPoolExecutor(max_workers=self.nb_threads) @@ -428,10 +422,13 @@ def check_and_go(self): keys = [f"{domain}.7z"] for key in keys: + url = f"{DOWNLOAD_URL}/{key}" + + last_modified = get_last_modified_for(url) if not self.s3_storage.has_object_matching( - key, meta={"version": version} + key, meta={"lastmodified": last_modified} ): - logger.info(f" [+] {key}") + logger.info(f" [+] {key} (from {last_modified})") # update shall trigger a new recipe schedule on Zimfarm upon compl. # - unless requested not to @@ -444,6 +441,7 @@ def check_and_go(self): self.update_file, key=key, schedule_upon_success=schedule_upon_success, + last_modified=last_modified, ) self.domains_futures.update({future: key})