Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup __fetch adb docs #35

Merged
merged 2 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ env:
jobs:
build:
runs-on: ubuntu-latest
continue-on-error: true
strategy:
matrix:
python: ["3.8", "3.9", "3.10", "3.11"] # "3.12"
Expand Down
27 changes: 12 additions & 15 deletions adbdgl_adapter/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def udf_v1_x(v1_df):

# 1. Fetch ArangoDB vertices
v_col_cursor, v_col_size = self.__fetch_adb_docs(
v_col, meta, **adb_export_kwargs
v_col, False, meta, **adb_export_kwargs
)

# 2. Process ArangoDB vertices
Expand All @@ -294,7 +294,7 @@ def udf_v1_x(v1_df):

# 1. Fetch ArangoDB edges
e_col_cursor, e_col_size = self.__fetch_adb_docs(
e_col, meta, **adb_export_kwargs
e_col, True, meta, **adb_export_kwargs
)

# 2. Process ArangoDB edges
Expand Down Expand Up @@ -614,6 +614,7 @@ def y_tensor_to_2_column_dataframe(dgl_tensor):
def __fetch_adb_docs(
self,
col: str,
is_edge: bool,
meta: Union[Set[str], Dict[str, ADBMetagraphValues]],
**adb_export_kwargs: Any,
) -> Tuple[Cursor, int]:
Expand All @@ -622,6 +623,8 @@ def __fetch_adb_docs(

:param col: The ArangoDB collection.
:type col: str
:param is_edge: True if **col** is an edge collection.
:type is_edge: bool
:param meta: The MetaGraph associated to **col**
:type meta: Set[str] | Dict[str, adbdgl_adapter.typings.ADBMetagraphValues]
:param adb_export_kwargs: Keyword arguments to specify AQL query options
Expand All @@ -631,42 +634,36 @@ def __fetch_adb_docs(
:rtype: pandas.DataFrame
"""

def get_aql_return_value(
meta: Union[Set[str], Dict[str, ADBMetagraphValues]]
) -> str:
def get_aql_return_value() -> str:
"""Helper method to formulate the AQL `RETURN` value based on
the document attributes specified in **meta**
"""
attributes = []
attributes = ["_key"]
attributes += ["_from", "_to"] if is_edge else []

if type(meta) is set:
attributes = list(meta)
attributes += list(meta)

elif type(meta) is dict:
for value in meta.values():
if type(value) is str:
attributes.append(value)
elif type(value) is dict:
attributes.extend(list(value.keys()))
attributes += list(value.keys())
elif callable(value):
# Cannot determine which attributes to extract if UDFs are used
# Therefore we just return the entire document
return "doc"

return f"""
MERGE(
{{ _key: doc._key, _from: doc._from, _to: doc._to }},
KEEP(doc, {list(attributes)})
)
"""
return f"KEEP(doc, {attributes})"

col_size: int = self.__db.collection(col).count()

with get_export_spinner_progress(f"ADB Export: '{col}' ({col_size})") as p:
p.add_task(col)

cursor: Cursor = self.__db.aql.execute(
f"FOR doc IN @@col RETURN {get_aql_return_value(meta)}",
f"FOR doc IN @@col RETURN {get_aql_return_value()}",
bind_vars={"@col": col},
**{**adb_export_kwargs, **{"stream": True}},
)
Expand Down