Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maint: auto update kg proxy #39

Merged
merged 2 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .github/workflows/cron-update-kg-proxy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: '[cron] update KG proxy'

on:
schedule:
- cron: '5 4 * * 0' # every Sunday at 4am

env:
EBRAINS_CCFLOW_CLIENT_ID: ${{ secrets.EBRAINS_OIDC_SIIBRA_CI_CLIENT_ID }}
EBRAINS_CCFLOW_CLIENT_SECRET: ${{ secrets.EBRAINS_OIDC_SIIBRA_CI_CLIENT_SECRET }}

jobs:
update:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
- artefacts-dir: "ebrainsquery/v3/AtlasAnnotation"
query-id: "eb4dd5b4-9364-4357-ac37-c61833831a82"

- artefacts-dir: "ebrainsquery/v3/ParcellationEntity"
query-id: "d236e213-c048-4331-bfd9-b45c60bc3d03"

- artefacts-dir: "ebrainsquery/v3/ParcellationEntityVersion"
query-id: "5e80bb84-68cf-4425-859b-5306f6838693"

- artefacts-dir: "ebrainsquery/v3/CustomAnatomicalEntity"
query-id: "d7cdc1d6-0aa5-498a-94ff-0291b95db13d"
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: 'Install dependencies'
run: |
pip install ebrains-drive
pip install git+https://github.com/xgui3783/ebrains-iam-util.git
- name: 'Fetch and Sync to KG proxy'
run: |
export EBRAINS_QUERY_ID=${{ matrix.query-id }}
export EBRAINS_ARTEFACTS_DIR=${{ matrix.artefacts-dir }}
export EBRAINS_CCFLOW_CLIENT_ID=${{ env.EBRAINS_CCFLOW_CLIENT_ID }}
export EBRAINS_CCFLOW_CLIENT_SECRET=${{ env.EBRAINS_CCFLOW_CLIENT_SECRET }}

python _ci/cron_sync_kg_proxy.py
97 changes: 97 additions & 0 deletions _ci/cron_sync_kg_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from itertools import repeat
import json
from io import StringIO

from tqdm import tqdm
import requests
from ebrains_iam.client_credential import ClientCredentialsSession
from ebrains_drive import BucketApiClient

EBRAINS_CCFLOW_CLIENT_ID = os.getenv("EBRAINS_CCFLOW_CLIENT_ID")
EBRAINS_CCFLOW_CLIENT_SECRET = os.getenv("EBRAINS_CCFLOW_CLIENT_SECRET")

EBRAINS_QUERY_ID = os.getenv("EBRAINS_QUERY_ID")
EBRAINS_ARTEFACTS_DIR = os.getenv("EBRAINS_ARTEFACTS_DIR")

KG_ROOT = os.getenv("KG_ROOT", "https://core.kg.ebrains.eu")

def get_paginated(url: str, size: int, from_: int, token: str):
resp = requests.get(url,
params={"stage": "RELEASED", "size": str(size), "from": str(from_)},
headers={ "Authorization": f"Bearer {token}" })
resp.raise_for_status()
return resp.json()

def write_to_file(path: str, data):
with open(path, "w") as fp:
json.dump(data, indent="\t", fp=fp)
fp.write("\n")

def main():
assert EBRAINS_CCFLOW_CLIENT_ID, f"EBRAINS_CCFLOW_CLIENT_ID must be defined"
assert EBRAINS_CCFLOW_CLIENT_SECRET, f"EBRAINS_CCFLOW_CLIENT_SECRET must be defined"
assert EBRAINS_QUERY_ID, f"EBRAINS_QUERY_ID must be defined"
assert EBRAINS_ARTEFACTS_DIR, f"EBRAINS_ARTEFACTS_DIR must be defined!"

iamclient = ClientCredentialsSession(EBRAINS_CCFLOW_CLIENT_ID, EBRAINS_CCFLOW_CLIENT_SECRET, scope=["team"])

path = Path(EBRAINS_ARTEFACTS_DIR)
path.mkdir(parents=True, exist_ok=True)

token = iamclient.get_token()

url=f"{KG_ROOT}/v3/queries/{EBRAINS_QUERY_ID}/instances"
resp = get_paginated(url, 50, 0, token=token)

total = resp.get("total")

print(f"Getting {total=} instances, paginated by 50...")

with ThreadPoolExecutor(max_workers=6) as ex:
remaining_results = list(ex.map(
get_paginated,
repeat(url),
repeat(50),
range(50, total, 50),
repeat(token)
))

print("Uploading files ...")

bucketclient = BucketApiClient(token=token)
bucket = bucketclient.buckets.get_bucket("reference-atlas-data")

all_atlas_annotations = [datum for r in [resp, *remaining_results] for datum in r.get("data")]

progress = tqdm(total=len(all_atlas_annotations), desc="Uploading", unit="Files")

for datum in all_atlas_annotations:
io = StringIO(json.dumps(datum, indent="\t"))
io.seek(0)
_id = datum.get("id").split("/")[-1]

retry_counter = 0
while True:
try:
bucket.upload(io, f"{EBRAINS_ARTEFACTS_DIR}/{_id}.json")
progress.update()
break
except Exception as e:
print(f"Error: {str(e)}")
if retry_counter >= 5:
print(f"Retry max hit, terminating")
raise e from e
retry_counter += 1
print("Retrying ...")

token = iamclient.get_token()
bucketclient = BucketApiClient(token=token)
bucket = bucketclient.buckets.get_bucket("reference-atlas-data")


if __name__ == "__main__":
main()

Loading