Skip to content

Commit

Permalink
Merge pull request #138 from TogetherCrew/wip-136
Browse files Browse the repository at this point in the history
[Hivemind] Update GitHub ETL workflow
  • Loading branch information
cyri113 authored May 2, 2024
2 parents 222e04d + bcf7dd1 commit a947cd0
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 66 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,5 @@ credentials.json

airflow_env/*
main.ipynb
main.py
main.py
dags/sample_test_etl.py
16 changes: 14 additions & 2 deletions dags/hivemind_etl_helpers/github_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@


def process_github_vectorstore(
community_id: str, github_org_id: str, from_starting_date: datetime | None = None
community_id: str,
github_org_ids: list[str],
repo_ids: list[str],
from_starting_date: datetime | None = None,
) -> None:
"""
ETL process for github raw data
Expand All @@ -29,6 +32,12 @@ def process_github_vectorstore(
------------
community_id : str
the community to save github's data
github_org_ids : list[str]
a list of github organization ids to process their data
repo_ids : list[str]
a list of github repositories to process their data
from_starting_date : datetime | None
the date to start processing data from
"""
load_dotenv()
dbname = f"community_{community_id}"
Expand Down Expand Up @@ -56,7 +65,10 @@ def process_github_vectorstore(

logging.info(f"Fetching data from date: {from_date}")

repository_ids = get_github_organization_repos(github_organization_id=github_org_id)
org_repository_ids = get_github_organization_repos(
github_organization_ids=github_org_ids
)
repository_ids = list(set(repo_ids + org_repository_ids))
logging.info(f"{len(repository_ids)} repositories to fetch data from!")

# EXTRACT
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,44 @@
import logging

from github.neo4j_storage.neo4j_connection import Neo4jConnection
from github.neo4j_storage.neo4j_enums import Node


def get_github_organization_repos(github_organization_id: str) -> list[int]:
def get_github_organization_repos(github_organization_ids: list[str]) -> list[int]:
"""
get repositories of given organization id list
Parameters
------------
github_organization_ids : list[str]
a list of github organization to fetch their repositories
Returns
---------
repo_ids : list[int]
fetched repository ids from organizations
"""
neo4j_connection = Neo4jConnection()
neo4j_driver = neo4j_connection.connect_neo4j()

with neo4j_driver.session() as session:
query = (
f"MATCH (go:{Node.GitHubOrganization.value} {{id: $org_id}})"
f"MATCH (go:{Node.GitHubOrganization.value})"
f"<-[:IS_WITHIN]-(repo:{Node.Repository.value})"
"RETURN COLLECT(repo.id) as repoIds"
)
results = session.execute_read(
lambda tx: list(tx.run(query, org_id=github_organization_id))
f"WHERE go.id IN $org_ids"
" RETURN COLLECT(repo.id) as repoIds"
)
try:
results = session.execute_read(
lambda tx: list(tx.run(query, org_ids=github_organization_ids))
)
except Exception as exp:
logging.error(
"Failed to execute Neo4j get organization repos query!"
"Returning empty array"
f"| Exception: {exp}"
)
return []

# it's always one result as we applied `COLLECT` in query
assert len(results) == 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
This file to be removed in future and we would move the features to directory
/dags/hivemind_etl_helpers/src/utils/modules/
"""

from datetime import datetime

from hivemind_etl_helpers.src.utils.mongo import MongoSingleton
Expand Down
24 changes: 12 additions & 12 deletions dags/hivemind_etl_helpers/src/utils/modules/modules_github.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ def get_learning_platforms(self):
```
[{
"community_id": "community1",
"organization_id": "12345",
"organization_ids": ["1111", "2222"],
"repo_ids": ["132", "45232"],
"from_date": datetime(2024, 1, 1)
}]
```
Expand All @@ -38,16 +39,15 @@ def get_learning_platforms(self):
if platform["name"] != self.platform_name:
continue

# learning is for doing ETL on data
if "learning" in platform["metadata"]:
learning_config = platform["metadata"]["learning"]

platforms_data.append(
{
"community_id": str(community),
"organization_id": learning_config["organizationId"],
"from_date": learning_config["fromDate"],
}
)
modules_options = platform["metadata"]

platforms_data.append(
{
"community_id": str(community),
"organization_ids": modules_options.get("organizationId", []),
"repo_ids": modules_options.get("repoIds", []),
"from_date": modules_options["fromDate"],
}
)

return platforms_data
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ def setUp(self) -> None:
session.execute_write(lambda tx: tx.run("MATCH (n) DETACH DELETE (n)"))

def test_fetch_empty_repos(self):
org_id = 123
repo_ids = get_github_organization_repos(github_organization_id=org_id)
org_id = [123]
repo_ids = get_github_organization_repos(github_organization_ids=org_id)

self.assertEqual(repo_ids, [])

def test_fetch_single_repo(self):
org_id = 123
org_id = [123]
with self.neo4j_driver.session() as session:
session.execute_write(
lambda tx: tx.run(
Expand All @@ -31,11 +31,11 @@ def test_fetch_single_repo(self):
"""
)
)
repo_ids = get_github_organization_repos(github_organization_id=org_id)
repo_ids = get_github_organization_repos(github_organization_ids=org_id)
self.assertEqual(repo_ids, [100])

def test_fetch_multiple_repo(self):
org_id = 123
org_id = [123]
with self.neo4j_driver.session() as session:
session.execute_write(
lambda tx: tx.run(
Expand All @@ -50,5 +50,31 @@ def test_fetch_multiple_repo(self):
"""
)
)
repo_ids = get_github_organization_repos(github_organization_id=org_id)
repo_ids = get_github_organization_repos(github_organization_ids=org_id)
self.assertEqual(set(repo_ids), set([100, 101, 102]))

def test_fetch_repos_multiple_orgs(self):
org_id = [123, 124]
with self.neo4j_driver.session() as session:
session.execute_write(
lambda tx: tx.run(
"""
CREATE (org:GitHubOrganization {id: 123})
CREATE (org2:GitHubOrganization {id: 124})
CREATE (repo: GitHubRepository {id: 100})
CREATE (repo2: GitHubRepository {id: 101})
CREATE (repo3: GitHubRepository {id: 102})
CREATE (repo)-[:IS_WITHIN]->(org)
CREATE (repo2)-[:IS_WITHIN]->(org)
CREATE (repo3)-[:IS_WITHIN]->(org)
CREATE (repo4: GitHubRepository {id: 200})
CREATE (repo5: GitHubRepository {id: 201})
CREATE (repo4)-[:IS_WITHIN]->(org2)
CREATE (repo5)-[:IS_WITHIN]->(org2)
"""
)
)
repo_ids = get_github_organization_repos(github_organization_ids=org_id)
self.assertEqual(set(repo_ids), set([100, 101, 102, 200, 201]))
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ def test_get_github_communities_data_single_modules(self):
"platform": ObjectId("6579c364f1120850414e0dc6"),
"name": "github",
"metadata": {
"learning": {
"fromDate": datetime(2024, 1, 1),
"organizationId": "1234",
}
"fromDate": datetime(2024, 1, 1),
"organizationId": ["1234"],
"repoIds": ["111", "234"],
},
}
]
Expand All @@ -53,7 +52,8 @@ def test_get_github_communities_data_single_modules(self):
result[0],
{
"community_id": "6579c364f1120850414e0dc5",
"organization_id": "1234",
"organization_ids": ["1234"],
"repo_ids": ["111", "234"],
"from_date": datetime(2024, 1, 1),
},
)
Expand All @@ -72,20 +72,18 @@ def test_get_github_communities_data_multiple_platforms(self):
"platform": ObjectId("6579c364f1120850414e0dc6"),
"name": "github",
"metadata": {
"learning": {
"fromDate": datetime(2024, 1, 1),
"organizationId": "1234",
}
"fromDate": datetime(2024, 1, 1),
"organizationId": ["1234"],
"repoIds": ["111", "234"],
},
},
{
"platform": ObjectId("6579c364f1120850414e0dc7"),
"name": "github",
"metadata": {
"learning": {
"fromDate": datetime(2024, 2, 2),
"organizationId": "4321",
}
"fromDate": datetime(2024, 2, 2),
"organizationId": ["4321"],
"repoIds": ["2132", "8888"],
},
},
]
Expand All @@ -102,15 +100,17 @@ def test_get_github_communities_data_multiple_platforms(self):
result[0],
{
"community_id": "1009c364f1120850414e0dc5",
"organization_id": "1234",
"organization_ids": ["1234"],
"repo_ids": ["111", "234"],
"from_date": datetime(2024, 1, 1),
},
)
self.assertEqual(
result[1],
{
"community_id": "1009c364f1120850414e0dc5",
"organization_id": "4321",
"organization_ids": ["4321"],
"repo_ids": ["2132", "8888"],
"from_date": datetime(2024, 2, 2),
},
)
Expand All @@ -132,20 +132,16 @@ def test_get_github_communities_data_multiple_platforms_multiple_communities(
"platform": ObjectId("6579c364f1120850414e0dc6"),
"name": "github",
"metadata": {
"learning": {
"fromDate": datetime(2024, 1, 1),
"organizationId": "1234",
}
"fromDate": datetime(2024, 1, 1),
"organizationId": ["1234"],
},
},
{
"platform": ObjectId("6579c364f1120850414e0dc7"),
"name": "github",
"metadata": {
"learning": {
"fromDate": datetime(2024, 2, 2),
"organizationId": "4321",
}
"fromDate": datetime(2024, 2, 2),
"repoIds": ["1111"],
},
},
]
Expand All @@ -160,20 +156,16 @@ def test_get_github_communities_data_multiple_platforms_multiple_communities(
"platform": ObjectId("6579c364f1120850414e0db5"),
"name": "github",
"metadata": {
"learning": {
"fromDate": datetime(2024, 3, 1),
"organizationId": "111111",
}
"fromDate": datetime(2024, 3, 1),
"organizationId": ["111111"],
},
},
{
"platform": ObjectId("6579c364f1120850414e0dc7"),
"name": "discord",
"metadata": {
"learning": {
"fromDate": datetime(2024, 3, 1),
"selectedChannels": ["666", "777"],
}
"fromDate": datetime(2024, 3, 1),
"selectedChannels": ["666", "777"],
},
},
]
Expand All @@ -188,30 +180,33 @@ def test_get_github_communities_data_multiple_platforms_multiple_communities(
self.assertEqual(len(results), 3)

for res in results:
if res["organization_id"] == "1234":
if res["organization_ids"] == ["1234"]:
self.assertEqual(
res,
{
"community_id": "1009c364f1120850414e0dc5",
"organization_id": "1234",
"organization_ids": ["1234"],
"repo_ids": [],
"from_date": datetime(2024, 1, 1),
},
)
elif res["organization_id"] == "4321":
elif res["organization_ids"] == []:
self.assertEqual(
res,
{
"community_id": "1009c364f1120850414e0dc5",
"organization_id": "4321",
"organization_ids": [],
"repo_ids": ["1111"],
"from_date": datetime(2024, 2, 2),
},
)
elif res["organization_id"] == "111111":
elif res["organization_ids"] == ["111111"]:
self.assertEqual(
res,
{
"community_id": "2009c364f1120850414e0dc5",
"organization_id": "111111",
"organization_ids": ["111111"],
"repo_ids": [],
"from_date": datetime(2024, 3, 1),
},
)
Expand Down
6 changes: 4 additions & 2 deletions dags/hivemind_github_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ def get_github_communities():
@task
def process_github_community(community_information: dict[str, str | datetime]):
community_id = community_information["community_id"]
organization_id = community_information["organization_id"]
organization_ids = community_information["organization_ids"]
repo_ids = community_information["repo_ids"]
from_date = community_information["from_date"]

logging.info(f"Starting Github ETL | community_id: {community_id}")
process_github_vectorstore(
community_id=community_id,
github_org_id=organization_id,
github_org_ids=organization_ids,
repo_ids=repo_ids,
from_starting_date=from_date,
)

Expand Down

0 comments on commit a947cd0

Please sign in to comment.