Skip to content

Commit

Permalink
feat(judge): add priority scheduling algorithm for dispatching challe…
Browse files Browse the repository at this point in the history
…nges to judge servers

The challenge priority is implemented by the judge-server [NTOJ-Judge Commit](tobiichi3227/NTOJ-Judge@678705c)
  • Loading branch information
tobiichi3227 committed Sep 4, 2024
1 parent 42ebf54 commit f6cb976
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 46 deletions.
2 changes: 1 addition & 1 deletion src/handlers/bulletin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class BulletinHandler(RequestHandler):
@reqenv
async def get(self, bulletin_id=None):
if bulletin_id is None:
can_submit = await JudgeServerClusterService.inst.is_server_online()
can_submit = JudgeServerClusterService.inst.is_server_online()
_, bulletin_list = await BulletinService.inst.list_bulletin()
bulletin_list.sort(key=lambda b: (b['pinned'], b['timestamp']), reverse=True)

Expand Down
3 changes: 2 additions & 1 deletion src/handlers/contests/manage/pro.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async def post(self):

elif reqtype == "rechal":
pro_id = int(pro_id)
can_submit = await JudgeServerClusterService.inst.is_server_online()
can_submit = JudgeServerClusterService.inst.is_server_online()
if not can_submit:
self.error('Ejudge')
return
Expand Down Expand Up @@ -121,6 +121,7 @@ async def _rechal(rechals):
pro_id,
pro['testm_conf'],
comp_type,
ChalConst.CONTEST_REJUDGE_PRI,
)

await asyncio.create_task(_rechal(rechals=result))
Expand Down
6 changes: 3 additions & 3 deletions src/handlers/manage/judge.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ManageJudgeHandler(RequestHandler):
@reqenv
@require_permission(UserConst.ACCTTYPE_KERNEL)
async def get(self):
judge_status_list = await JudgeServerClusterService.inst.get_servers_status()
judge_status_list = JudgeServerClusterService.inst.get_servers_status()
await self.render('manage/judge', page='judge', judge_status_list=judge_status_list)

@reqenv
Expand All @@ -30,7 +30,7 @@ async def post(self):
if reqtype == 'connect':
index = int(self.get_argument('index'))

err, server_inform = await JudgeServerClusterService.inst.get_server_status(index)
err, server_inform = JudgeServerClusterService.inst.get_server_status(index)
if (server_name := server_inform['name']) == '':
server_name = f"server-{index}"

Expand All @@ -52,7 +52,7 @@ async def post(self):
index = int(self.get_argument('index'))
pwd = str(self.get_argument('pwd'))

err, server_inform = await JudgeServerClusterService.inst.get_server_status(index)
err, server_inform = JudgeServerClusterService.inst.get_server_status(index)
if (server_name := server_inform['name']) == '':
server_name = f"server-{index}"

Expand Down
3 changes: 2 additions & 1 deletion src/handlers/manage/pro.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ async def post(self, page=None):
return

pro_id = int(self.get_argument('pro_id'))
can_submit = await JudgeServerClusterService.inst.is_server_online()
can_submit = JudgeServerClusterService.inst.is_server_online()
if not can_submit:
self.error('Ejudge')
return
Expand Down Expand Up @@ -339,6 +339,7 @@ async def _rechal(rechals):
pro_id,
pro['testm_conf'],
comp_type,
ChalConst.NORMAL_REJUDGE_PRI,
)

await asyncio.create_task(_rechal(rechals=result))
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/pro.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ async def get(self, pro_id):
if result['state'] is None or result['state'] != ChalConst.STATE_AC:
pro['tags'] = ''

can_submit = await JudgeServerClusterService.inst.is_server_online()
can_submit = JudgeServerClusterService.inst.is_server_online()

await self.render(
'pro',
Expand Down
16 changes: 13 additions & 3 deletions src/handlers/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async def get(self, pro_id=None):

pro_id = int(pro_id)

# TODO: if problem is makefile type, we should restrict compiler type
allow_compilers = ChalConst.ALLOW_COMPILERS
if self.contest:
if not self.contest.is_running() and not self.contest.is_admin(self.acct):
Expand All @@ -33,7 +34,7 @@ async def get(self, pro_id=None):

allow_compilers = self.contest.allow_compilers

can_submit = await JudgeServerClusterService.inst.is_server_online()
can_submit = JudgeServerClusterService.inst.is_server_online()

if not can_submit:
self.finish('<h1 style="color: red;">All Judge Server Offline</h1>')
Expand All @@ -56,7 +57,7 @@ async def get(self, pro_id=None):
@require_permission([UserConst.ACCTTYPE_USER, UserConst.ACCTTYPE_KERNEL])
@contest_require_permission('all')
async def post(self):
can_submit = await JudgeServerClusterService.inst.is_server_online()
can_submit = JudgeServerClusterService.inst.is_server_online()

if not can_submit:
self.error('Ejudge')
Expand All @@ -73,13 +74,16 @@ async def post(self):
comp_type = str(self.get_argument('comp_type'))

if self.contest:
pri = ChalConst.CONTEST_PRI
if not self.contest.is_running() and not self.contest.is_admin(self.acct):
self.error('Eacces')
return

if pro_id not in self.contest.pro_list:
self.error('Enoext')
return
else:
pri = ChalConst.NORMAL_PRI

err = await self.is_allow_submit(code, comp_type, pro_id)
if err:
Expand Down Expand Up @@ -107,6 +111,10 @@ async def post(self):
elif reqtype == 'rechal':
if ((self.contest is None and self.acct.is_kernel()) # not in contest
or (self.contest and self.contest.is_admin(self.acct))): # in contest
if self.contest:
pri = ChalConst.CONTEST_REJUDGE_PRI
else:
pri = ChalConst.NORMAL_REJUDGE_PRI

chal_id = int(self.get_argument('chal_id'))

Expand All @@ -129,6 +137,7 @@ async def post(self):
pro_id,
pro['testm_conf'],
comp_type,
pri=pri
)
if err:
self.error(err)
Expand All @@ -139,6 +148,7 @@ async def post(self):

self.finish(json.dumps(chal_id))
return

async def is_allow_submit(self, code: str, comp_type: str, pro_id: int):
# limits variable config
allow_compilers = ChalConst.ALLOW_COMPILERS
Expand All @@ -153,6 +163,7 @@ async def is_allow_submit(self, code: str, comp_type: str, pro_id: int):
if len(code) > ProService.CODE_MAX:
return 'Ecodemax'

# TODO: if problem is makefile type, we should restrict compiler type
if comp_type not in allow_compilers:
return 'Ecomp'

Expand Down Expand Up @@ -185,5 +196,4 @@ async def is_allow_submit(self, code: str, comp_type: str, pro_id: int):
else:
await self.rs.set(last_submit_name, int(time.time()))


return None
16 changes: 8 additions & 8 deletions src/services/chal.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class ChalConst:
'java': 'OpenJDK 17.0.8',
}

NORMAL_PRI = 0
CONTEST_PRI = 1
CONTEST_REJUDGE_PRI = 2
NORMAL_REJUDGE_PRI = 3


@dataclass
class ChalSearchingParam:
Expand Down Expand Up @@ -287,7 +292,7 @@ async def get_chal(self, chal_id):
},
)

async def emit_chal(self, chal_id, pro_id, testm_conf, comp_type):
async def emit_chal(self, chal_id, pro_id, testm_conf, comp_type, pri: int):
chal_id = int(chal_id)
pro_id = int(pro_id)

Expand Down Expand Up @@ -335,11 +340,7 @@ async def emit_chal(self, chal_id, pro_id, testm_conf, comp_type):

file_ext = ChalConst.FILE_EXTENSION[comp_type]

try:
code_f = open(f"code/{chal_id}/main.{file_ext}", 'rb')
code_f.close()

except FileNotFoundError:
if not os.path.isfile(f"code/{chal_id}/main.{file_ext}"):
for test in testl:
await self.update_test(chal_id, test['test_idx'], ChalConst.STATE_ERR, 0, 0, '', refresh_db=False)
await self.rs.publish('materialized_view_req', (await self.rs.get('materialized_view_counter')))
Expand All @@ -352,7 +353,7 @@ async def emit_chal(self, chal_id, pro_id, testm_conf, comp_type):

await JudgeServerClusterService.inst.send(
{
'pri': 1,
'pri': pri,
'chal_id': chal_id,
'test': testl,
'code_path': f'{chal_id}/main.{file_ext}',
Expand All @@ -361,7 +362,6 @@ async def emit_chal(self, chal_id, pro_id, testm_conf, comp_type):
'comp_type': comp_type,
'check_type': test_conf['check_type'],
},
1,
pro_id,
contest_id,
)
Expand Down
52 changes: 24 additions & 28 deletions src/services/judge.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ async def disconnect_server(self) -> Union[str, None]:

try:
self.status = False
self.running_chal_cnt = 0
self.ws.close()
self.main_task.cancel()
self.main_task = None
Expand All @@ -108,7 +109,7 @@ async def disconnect_server(self) -> Union[str, None]:

return None

async def get_server_status(self):
def get_server_status(self):
return (
None,
{
Expand Down Expand Up @@ -203,56 +204,51 @@ async def disconnect_all_server(self) -> None:
self.queue.get()
await server.disconnect_server()

async def get_server_status(self, idx):
def get_server_status(self, idx):
if idx < 0 or idx >= len(self.servers):
return 'Eparam'

_, status = await self.servers[idx].get_server_status()
_, status = self.servers[idx].get_server_status()
return None, status

async def get_servers_status(self) -> List[Dict]:
def get_servers_status(self) -> List[Dict]:
status_list: List[Dict] = []
for server in self.servers:
_, status = await server.get_server_status()
_, status = server.get_server_status()
status_list.append(status)

return status_list

async def is_server_online(self) -> bool:
def is_server_online(self) -> bool:
for server in self.servers:
_, status = await server.get_server_status()
_, status = server.get_server_status()
if status['status']:
return True

return False

async def send(self, data, pri, pro_id, contest_id) -> None:
# simple round-robin impl
async def send(self, data, pro_id, contest_id) -> None:
# priority impl

for i in range(self.idx + 1, len(self.servers)):
if self.servers[i].ws is None:
continue
if not self.is_server_online():
return

_, status = await self.servers[i].get_server_status()
while not self.queue.empty():
running_cnt, idx = self.queue.get()
_, status = self.get_server_status(idx)
if not status['status']:
continue

await self.servers[i].send(json.dumps(data))
self.servers[i].chal_map[data['chal_id']] = {"pro_id": pro_id, "contest_id": contest_id}
judge_id = status['judge_id']

self.idx = i
return

for i in range(0, len(self.servers)):
if self.servers[i].ws is None:
continue
if data['chal_id'] in self.servers[judge_id].chal_map:
self.queue.put([running_cnt, idx])
break

_, status = await self.servers[i].get_server_status()
if not status['status']:
continue
await self.servers[judge_id].send(data)
_, status = self.get_server_status(idx)

await self.servers[i].send(data)
self.servers[i].chal_map[data['chal_id']] = {"pro_id": pro_id, "contest_id": contest_id}
self.queue.put([status['running_chal_cnt'], judge_id])
self.servers[idx].chal_map[data['chal_id']] = {"pro_id": pro_id, "contest_id": contest_id}

self.idx = i
return
break

0 comments on commit f6cb976

Please sign in to comment.