-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRoadMap.py
148 lines (120 loc) · 5.3 KB
/
RoadMap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from typing import List, Optional
from Project import Project, DateRange
from User import User
from datetime import datetime
from Task import Task, Priority
from Status import Status
class RoadMap:
projects: List[Project]
tasks: List[Task]
def __init__(self, project_db_identifier: str, task_db_identifier: str, notion_client):
project_dict = notion_client.databases.query(database_id = project_db_identifier)
project_results = project_dict.get("results")
# Paginate through all the different projects if it ever overflows the Notion limit (precautionary measure).
while project_dict.get("has_more"):
project_dict = notion_client.databases.query(
database_id=project_db_identifier,
start_cursor=project_dict["next_cursor"]
)
if project_dict.get("results"):
project_results.extend(project_dict.get("results"))
task_dict = notion_client.databases.query(database_id = task_db_identifier)
task_results = task_dict.get("results")
# Paginate through all the different tasks
while task_dict.get("has_more"):
task_dict = notion_client.databases.query(
database_id=task_db_identifier,
start_cursor=task_dict["next_cursor"]
)
if task_dict.get("results"):
task_results.extend(task_dict.get("results"))
self.tasks = parse_task_dict(self, task_results)
self.projects = parse_project_dict(self, project_results)
def parse_project_dict(self, project_dict: List[dict]) -> List[Project]:
projects = []
for project in project_dict:
if (len(project["properties"]["prototype"]["title"]) >= 1):
new_project = Project(
name = project["properties"]["prototype"]["title"][0]["text"]["content"],
prj_id = project["id"],
status = Status.from_str(project["properties"]["Status"]["status"]["name"]),
timeline = parse_timeline(project["properties"]["Timeline"]["date"]),
dri_emails = parse_dri_emails(project["properties"]["DRI"]["people"]),
task_ids = [ task_reference["id"] for task_reference in project["properties"]["Tasks"]["relation"]],
project_url = project["url"]
)
projects.append(
new_project
)
new_project.tasks = [find_task_by_id(self, task_id) for task_id in new_project.task_ids]
return projects
def parse_timeline(date_dict: dict) -> Optional[DateRange]:
try:
start_date = datetime.strptime(date_dict['start'], "%Y-%m-%d")
end_date = datetime.strptime(date_dict['end'], "%Y-%m-%d")
return DateRange(end_date, start_date)
except TypeError:
return None
def parse_dri_emails(dri_dict: dict) -> List[User]:
users = []
for user in dri_dict:
users.append(
User(email = user["person"]["email"], notion_uid = user["id"])
)
return users
def parse_task_dict(self, task_dict: List[dict]) -> List[Task]:
tasks = []
for task in task_dict:
if (len(task["properties"]["Name"]["title"]) >= 1):
new_task = Task(
task_name = task["properties"]["Name"]["title"][0]["text"]["content"],
status = Status.from_str(task["properties"]["Status"]["status"]["name"]),
timeline = parse_due_date(task["properties"]["Due Date"]["date"]),
priority = parse_priority(task["properties"]["Priority"]),
dri_emails = parse_dri_emails(task["properties"]["Assigned to"]["people"]),
task_id = task["id"],
related_project_id = [prj_reference["id"] for prj_reference in task["properties"]["Project"]["relation"]],
notion_url = task["url"]
)
tasks.append(
new_task
)
return tasks
def parse_due_date(date_dict: dict) -> Optional[DateRange]:
try:
end_date = datetime.strptime(date_dict['start'], "%Y-%m-%d")
return DateRange(end_date)
except TypeError:
return None
def parse_priority(priority_dict: dict) -> Priority:
try:
return Priority.from_str(priority_dict["select"]["name"])
except TypeError:
return Priority.null
def parse_dri_emails(dri_dict: dict) -> List[User]:
users = []
for user in dri_dict:
users.append(
User(email = user["person"]["email"], notion_uid = user["id"])
)
return users
def find_task_by_id(self, task_id: str) -> Optional[Task]:
for task in self.tasks:
if task.task_id == task_id:
return task
return None
def find_project_by_name(self, project_name: str) -> Optional[Project]:
for project in self.projects:
if project.project_name == project_name:
return project
return None
def find_task_by_name(self, task_name: str) -> Optional[Task]:
for task in self.tasks:
if task.task_name == task_name:
return task
return None
def access_rollup_task_name(task_reference: dict) -> Optional[str]:
try:
return task_reference["title"][0]["plain_text"]
except IndexError:
return None