Skip to content

Commit

Permalink
Merge pull request #967 from openzim/watcher_last_modified
Browse files Browse the repository at this point in the history
StackExchange watcher: sync dumps whenever they are modified
  • Loading branch information
benoit74 authored May 14, 2024
2 parents d0cf97b + d780afb commit bc1775a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 23 deletions.
16 changes: 15 additions & 1 deletion dev/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ This container is the main worker container, responsible to start tasks. It is c

This container is a sample task executor. It is commented by default.

### watcher

This container is a StackExchange dumps watcher.

## Instructions

First start the Docker-Compose stack:
Expand Down Expand Up @@ -169,4 +173,14 @@ docker exec -it zf_receiver /contrib/create-warehouse-paths.sh

You can start a task manager manually simply by requesting a task in the UI and starting it manually (see above).

Once the task is reserved for the `test_worker`, you can modify the `task_worker` container `command` in `docker-compose.yml` with this ID, uncomment the `task_worker` section and start it.
Once the task is reserved for the `test_worker`, you can modify the `task_worker` container `command` in `docker-compose.yml` with this ID, uncomment the `task_worker` section and start it.

### start a StackExchange dumps watcher

Uncomment the watcher configuration in docker-compose.yml

Setup a proper S3 url to a test bucket (including credentials in the URL)

By default, only beer.stackexchange.com domain dump is considered

Task are scheduled as in prod when a new dump is downloaded
13 changes: 13 additions & 0 deletions dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ services:
depends_on:
- backend

# # uncomment this only if you want to run a StackExchange dumps watcher
# watcher:
# build: ../watcher
# container_name: zf_watcher
# command: watcher --only beer.stackexchange.com --debug
# environment:
# - ZIMFARM_API_URL=http://backend:8000/v1
# - ZIMFARM_USERNAME=admin
# - ZIMFARM_PASSWORD=admin
# - S3_URL=https://s3.us-west-1.wasabisys.com/?keyId=<your_key_id>&secretAccessKey=<your_secret_access_key>&bucketName=org-kiwix-dev-stackexchange
# depends_on:
# - backend

# # uncomment this only if you want to run a worker manager
# worker_mgr:
# build:
Expand Down
42 changes: 20 additions & 22 deletions watcher/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,12 @@
logging.getLogger(logger_name).setLevel(logging.WARNING)


def get_version_for(url):
"""casted datetime of the Last-Modified header for an URL"""
def get_last_modified_for(url):
"""the Last-Modified header for an URL"""
with requests.head(url, allow_redirects=True) as resp:
if resp.status_code == 404:
raise FileNotFoundError(url)
return datetime.datetime.strptime(
resp.headers.get("last-modified"), "%a, %d %b %Y %H:%M:%S GMT"
).strftime("%Y-%m")
return resp.headers.get("last-modified")


def get_token(api_url, username, password):
Expand Down Expand Up @@ -243,17 +241,16 @@ def zimfarm_credentials_ok(self):
return success

def retrieve_all_sites(self):
"""version, list of domain names for which there's a dump online"""
"""list of domain names for which there's a dump online"""
url = f"{DOWNLOAD_URL}/Sites.xml"
resp = requests.get(url)
parser = XMLtoDict()
sites = parser.parse(resp.text).get("sites", {}).get("row", [])
version = get_version_for(url)

def _filter(item):
return item in self.only if self.only else 1

return version, filter(
return filter(
_filter, [re.sub(r"^https?://", "", site.get("@Url", "")) for site in sites]
)

Expand Down Expand Up @@ -343,7 +340,7 @@ def schedule_in_zimfarm(self, domain):

return payload.get("requested")

def update_file(self, key, schedule_upon_success):
def update_file(self, key, schedule_upon_success, last_modified):
"""Do an all-steps update of that file as we know there's a new one avail."""
domain = re.sub(r".7z$", "", key)
prefix = f" [{domain}]"
Expand All @@ -357,18 +354,13 @@ def update_file(self, key, schedule_upon_success):

if self.s3_storage.has_object(key):
logger.info(f"{prefix} Removing object in S3")
obsolete = self.s3_storage.get_object_stat(key).meta.get("version")
obsolete = self.s3_storage.get_object_stat(key).meta.get("lastmodified")
self.s3_storage.delete_object(key)
logger.info(f"{prefix} Removed object (was version {obsolete})")
logger.info(f"{prefix} Removed object (was from {obsolete})")

logger.info(f"{prefix} Downloading…")
url = f"{DOWNLOAD_URL}/{key}"
fpath = self.work_dir / key
try:
version = get_version_for(url)
except FileNotFoundError:
logger.error(f"{url} is missing upstream. Skipping.")
return

wget = subprocess.run(
["/usr/bin/env", "wget", "-O", fpath, url],
Expand All @@ -383,7 +375,9 @@ def update_file(self, key, schedule_upon_success):
logger.info(f"{prefix} Download completed")

logger.info(f"{prefix} Uploading to S3…")
self.s3_storage.upload_file(fpath=fpath, key=key, meta={"version": version})
self.s3_storage.upload_file(
fpath=fpath, key=key, meta={"lastmodified": last_modified}
)
logger.info(f"{prefix} Uploaded")

fpath.unlink()
Expand All @@ -400,9 +394,9 @@ def update_file(self, key, schedule_upon_success):
logger.warning(f"{prefix} couldn't schedule recipe(s)")

def check_and_go(self):
logger.info("Checking StackExchange version…")
version, domains = self.retrieve_all_sites()
logger.info(f"Latest online version: {version}. Comparing with S3…")
logger.info("Getting list of SE domains")
domains = self.retrieve_all_sites()
logger.info("Grabbing online versions and comparing with S3…")

self.domains_futures = {} # future: key
self.domains_executor = cf.ThreadPoolExecutor(max_workers=self.nb_threads)
Expand All @@ -428,10 +422,13 @@ def check_and_go(self):
keys = [f"{domain}.7z"]

for key in keys:
url = f"{DOWNLOAD_URL}/{key}"

last_modified = get_last_modified_for(url)
if not self.s3_storage.has_object_matching(
key, meta={"version": version}
key, meta={"lastmodified": last_modified}
):
logger.info(f" [+] {key}")
logger.info(f" [+] {key} (from {last_modified})")

# update shall trigger a new recipe schedule on Zimfarm upon compl.
# - unless requested not to
Expand All @@ -444,6 +441,7 @@ def check_and_go(self):
self.update_file,
key=key,
schedule_upon_success=schedule_upon_success,
last_modified=last_modified,
)
self.domains_futures.update({future: key})

Expand Down

0 comments on commit bc1775a

Please sign in to comment.