-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathabc.py
468 lines (366 loc) · 13 KB
/
abc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
import io
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from enum import IntEnum, auto, unique
from typing import AsyncIterator, Awaitable, Callable, Optional
from pydantic import field_validator
@dataclass
class Session:
"""A session representation.
Author:
@es3n1n
Attributes:
token: The authorization token.
cookies: The session cookies.
Methods:
validate: Check the validity of this session.
"""
token: Optional[str] = None
cookies: dict[str, str] = field(default_factory=dict)
def validate(self) -> bool:
return len(self.cookies) > 0 or self.token is not None
@dataclass
class ChallengeSolver:
"""A class for representing a challenge solver.
Author:
@es3n1n
Attributes:
team: The team that solved a challenge.
solved_at: The solve time.
"""
team: "Team"
solved_at: datetime
@dataclass
class ChallengeFile:
"""A class representing a challenge file attachment.
Author:
@es3n1n
Attributes:
url: The file attachment's URL.
name: The file name.
"""
url: str
name: Optional[str] = None
@dataclass
class Challenge:
"""A class representing a CTF challenge.
Author:
@es3n1n
Attributes:
id: The challenge ID (could be either numerical or in another form such as a
UUID).
tags: The challenge tags.
category: The challenge category.
name: The challenge name.
description: The challenge description.
value: The challenge value (i.e., number of awarded points) at the time of
its creation (this may change if the scoring is dynamic).
files: A list of file attachments associated to this challenge.
connection_info: The challenge connection info.
solves: The number of solves on this challenge.
solved_by: List of solvers who solved this challenge.
TODO:
Add max_attempts/attempts
Add hints
Add connection_info
Notes:
Some fields can remain unset depending on the platform.
Not all platforms use a numerical ID for the challenges, for example, rCTF uses
a UUID while CTFd prefers numerical identifiers. It is thus important to convert
the ID to an integer before using it with CTFd for example.
"""
id: str
name: str
category: str
description: str
tags: Optional[list[str]] = None
value: Optional[int] = None
files: Optional[list[ChallengeFile]] = None
images: Optional[list[ChallengeFile]] = None
connection_info: Optional[str] = None
solves: Optional[int] = None
solved_by: Optional[list[ChallengeSolver]] = None
solved_by_me: bool = False
@classmethod
@field_validator("solved_by", mode="before")
def validate_solved_by(
cls, value: Optional[list[ChallengeSolver]]
) -> Optional[list[ChallengeSolver]]:
if value is None:
return value
# Sort the solvers by their solve time
value.sort(key=lambda x: x.solved_at)
return value
@unique
class SubmittedFlagState(IntEnum):
"""An enum representing submitted flag states.
Author:
@es3n1n
"""
ALREADY_SUBMITTED = auto()
INCORRECT = auto()
CORRECT = auto()
CTF_NOT_STARTED = auto()
CTF_PAUSED = auto()
CTF_ENDED = auto()
INVALID_CHALLENGE = auto()
INVALID_USER = auto()
RATE_LIMITED = auto()
UNKNOWN = auto()
@dataclass
class Retries:
"""A class representing submission retries.
Author:
@es3n1n
Attributes:
left: The number of retries left.
out_of: The maximum number of retries, or None if the number of retries is
unlimited.
"""
left: int
out_of: Optional[int] = None
@dataclass
class SubmittedFlag:
"""A class representing a flag submission.
Author:
@es3n1n
Attributes:
state: The submitted flag state.
retries: Submission retries.
is_first_blood: Whether this was the first successful submission.
"""
state: SubmittedFlagState
retries: Optional[Retries] = None
is_first_blood: bool = False
async def update_first_blood(
self,
ctx: "PlatformCTX",
solvers_getter: Callable[..., AsyncIterator[ChallengeSolver]],
challenge_getter: Callable[..., Awaitable[Optional[Challenge]]],
challenge_id: str,
me: Optional["Team"] = None,
) -> None:
"""Update the `is_first_blood` attribute.
Arguments:
ctx: The platform context.
solvers_getter: A platform-specific function for retrieving the solver for
a specific challenge.
challenge_getter: A platform-specific function for retrieving the challenge
by its ID.
challenge_id: The ID of the challenge for which we want to check if we got
first blood.
me: A representation of our team.
"""
if self.state != SubmittedFlagState.CORRECT:
return
# If there's no team object, then we should try to detect first blood via
# pulling solves count of the challenge
if me is None:
challenge: Optional[Challenge] = await challenge_getter(ctx, challenge_id)
self.is_first_blood = challenge is not None and challenge.solves <= 1
return
# Query the solvers of this challenge.
solvers_generator: Optional[AsyncIterator[ChallengeSolver]] = solvers_getter(
ctx=ctx, challenge_id=challenge_id, limit=1
)
if solvers_generator is None:
# Something went wrong.
return
first_solver: Optional[ChallengeSolver] = await anext(solvers_generator, None)
if first_solver is None:
# XXX (hfz) Something went wrong, unless there's caching at play and the
# platform is still returning an empty list. It doesn't apply to CTFd or
# rCTF, this might need a reimplementation if we encounter a platform with
# such behavior, until then, keeping the code simple is preferred.
# Note that this assumes the returned solvers are sorted by solve time.
return
self.is_first_blood = first_solver.team == me
@dataclass
class Team:
"""A class representing CTF team information as returned by the CTF platform.
Author:
@es3n1n
Attributes:
id: The team ID.
name: The team name.
score: The current team score.
invite_token: The team invite token (only used for rCTF.)
solves: A list of challenges that this team solved (only used for rCTF).
"""
id: str
name: str
score: Optional[int] = None
invite_token: Optional[str] = None
solves: Optional[list[Challenge]] = None
def __eq__(self, other: Optional["Team"]) -> bool:
return other is not None and (self.id == other.id or self.name == other.name)
@dataclass
class TeamScoreHistory:
"""A class representing CTF team score history.
Author:
@es3n1n
Attributes:
name: The team name.
is_me: Set to true if this team is the team that we're currently authorized as.
history: Score history.
"""
@dataclass
class HistoryItem:
time: datetime
score: int
name: str
is_me: bool = False
history: list[HistoryItem] = field(default_factory=list)
@dataclass
class RegistrationStatus:
"""A class representing a team registration status.
Author:
@es3n1n
Attributes:
success: Whether the registration was successful.
message: The response message returned by the CTF platform.
token: The authorization token returned by the CTF platform (only used for
rCTF).
invite: The team invite URL (only used for rCTF).
"""
success: bool
message: Optional[str] = None
token: Optional[str] = None
invite: Optional[str] = None
@dataclass
class PlatformCTX:
"""A class representing a platform context.
Author:
@es3n1n
Attributes:
base_url: The platform base URL.
args: A custom set of arguments, such as `email`, `login`, `password`, `token`,
and so on. None of these are required by default and everything should be
checked within the platform itself.
session: The session object for accessing private sections of a platform.
Properties:
url_stripped: Return the base URL without a trailing slash.
Methods:
get_args: Get arguments from the set of custom arguments, optionally extending
them with additional items.
is_authorized: Check if our session is valid for querying private sections of
the platform.
login: Login to the CTF platform and store the login session.
Notes:
Ideally we should have automatic validation for the attributes (e.g., Pydantic).
"""
base_url: str
args: dict[str, str] = field(default_factory=dict)
session: Optional[Session] = None
@property
def url_stripped(self) -> str:
return self.base_url.strip("/")
@staticmethod
def from_credentials(credentials: dict[str, str]) -> "PlatformCTX":
"""Custom constructor that initializes a class instance from a set of
credentials.
Args:
credentials: A dictionary of credentials. It must contain at least at the
URL of the CTF platform.
"""
return PlatformCTX(
base_url=credentials["url"],
args=credentials,
)
def get_args(self, *required_fields: str, **kwargs: str) -> dict[str, str]:
"""Get arguments from the set of custom arguments (i.e., self.args), optionally
extending them with additional items.
Args:
required_fields: A set of required field names.
kwargs: Additional (key, value) pairs to include in the result.
Returns:
A dictionary of arguments (e.g., email, password, etc.).
"""
result: dict[str, str] = {
key: value for key, value in self.args.items() if key in required_fields
}
for k, v in kwargs.items():
result[k] = v
return result
def is_authorized(self) -> bool:
"""Check whether our session is authorized.
Returns:
True if our session is authorized, False otherwise.
"""
return self.session is not None and self.session.validate()
async def login(self, login_routine: Callable) -> bool:
"""Attempt to log in to the CTF platform using the platform-specific login
routine if the current session is unauthorized.
Args:
login_routine: The function that handles the login routine.
Returns:
A boolean representing whether we've been authorized.
"""
if not self.is_authorized():
self.session = await login_routine(self)
return self.is_authorized()
class PlatformABC(ABC):
"""An abstract base class representing a CTF platform.
Author:
@es3n1n
Notes:
If some methods return None instead of the result, it means that
something went horribly wrong within the communication logic, it might be worth
to try again.
"""
name: str = "ABC"
@classmethod
@abstractmethod
async def match_platform(cls, ctx: PlatformCTX) -> bool:
pass
@classmethod
@abstractmethod
async def login(cls, ctx: PlatformCTX) -> Optional[Session]:
pass
@classmethod
@abstractmethod
async def fetch(cls, ctx: PlatformCTX, url: str) -> Optional[io.BytesIO]:
pass
@classmethod
@abstractmethod
async def submit_flag(
cls, ctx: PlatformCTX, challenge_id: str, flag: str
) -> Optional[SubmittedFlag]:
pass
@classmethod
@abstractmethod
async def pull_challenges(cls, ctx: PlatformCTX) -> AsyncIterator[Challenge]:
yield Challenge(id="")
@classmethod
@abstractmethod
async def pull_scoreboard(
cls, ctx: PlatformCTX, max_entries_count: int = 20
) -> AsyncIterator[Team]:
yield Team(id="", name="")
@classmethod
@abstractmethod
async def pull_scoreboard_datapoints(
cls, ctx: PlatformCTX, count: int = 10
) -> Optional[list[TeamScoreHistory]]:
pass
@classmethod
@abstractmethod
async def register(cls, ctx: PlatformCTX) -> RegistrationStatus:
pass
@classmethod
@abstractmethod
async def get_challenge(
cls, ctx: PlatformCTX, challenge_id: str
) -> Optional[Challenge]:
pass
@classmethod
@abstractmethod
async def pull_challenge_solvers(
cls, ctx: PlatformCTX, challenge_id: str, limit: int = 10
) -> AsyncIterator[ChallengeSolver]:
yield ChallengeSolver(team=Team(id="", name=""), solved_at=datetime.utcnow())
@classmethod
@abstractmethod
async def get_me(cls, ctx: PlatformCTX) -> Optional[Team]:
pass