-
Notifications
You must be signed in to change notification settings - Fork 1
/
browser_test.py
144 lines (111 loc) · 3.78 KB
/
browser_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/python3
import asyncio
import functools
import os
import re
import subprocess
import sys
import selenium.webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions
class AsyncBrowser:
"""Runs Chrome and provides async methods for concurrency."""
def __init__(self):
self.loop = asyncio.get_event_loop()
self.driver = None
def __del__(self):
self.stop()
async def start(self):
"""Starts Chrome."""
caps = DesiredCapabilities.CHROME
caps['loggingPrefs'] = {'browser': 'ALL'}
self.driver = await self._call(
selenium.webdriver.Chrome, desired_capabilities=caps)
def stop(self) -> None:
if self.driver is not None:
self.driver.quit()
self.driver = None
async def get(self, *args, **kwargs):
return await self._call(self.driver.get, *args, **kwargs)
async def _call(self, func, *args, **kwargs):
"""Calls the function asynchronously in a thread pool."""
call = functools.partial(func, *args, **kwargs)
return await self.loop.run_in_executor(None, call)
class User(AsyncBrowser):
"""A user with its own instance of Chrome running."""
def __init__(self, name: str):
super().__init__()
self.name = name
async def login(self):
"""Creates an account for the user user."""
await self.get("http://localhost:8080/")
# Submit the name in the form.
name_field = self.driver.find_element_by_name("name")
name_field.send_keys(self.name)
form = self.driver.find_element_by_tag_name("form")
await self._call(form.submit)
# We have been redirected to the game page. Get
# the game ID.
self.game = re.match(r".*/\?g=([0-9a-f]+)$",
self.driver.current_url).group(1)
return self.game
async def join(self, game):
"""Joins the specified game."""
await self.get("http://localhost:8080/?g={}".format(game))
def set_ready(self) -> None:
check_box = self.driver.find_element_by_id("isReadyCheckBox")
WebDriverWait(self.driver, 10).until(
expected_conditions.visibility_of(check_box))
check_box.click()
def check_logs(self) -> None:
for line in self.driver.get_log("browser"):
print(self.name, line["level"], line["message"])
assert line["level"] == "INFO"
def ignore_logs(self) -> None:
self.driver.get_log("browser")
def move(self, piece_id: str, square_id: str) -> None:
piece = self.driver.find_element_by_id(piece_id)
square = self.driver.find_element_by_id(square_id)
ActionChains(self.driver).drag_and_drop(piece, square).perform()
def wait_until_disappeared(self, id):
WebDriverWait(self.driver, 10).until(
expected_conditions.invisibility_of_element_located((By.ID, id)))
async def browser_test():
"""Starts a game between two users.user1
Plays until the first piece is captured and waits for it to disappear.
"""
user1 = User("Webuser1")
user2 = User("Webuser2")
await asyncio.gather(user1.start(), user2.start())
try:
game, _ = await asyncio.gather(user1.login(), user2.login())
await user2.join(game)
user2.set_ready()
user1.set_ready()
# White pawn in front of king.
user1.move("p12", "E4")
# Pawn to H5.
user2.move("p31", "H5")
# Queen strikes H5.
user1.move("p3", "H5")
user1.wait_until_disappeared("p31")
user2.wait_until_disappeared("p31")
user1.check_logs()
user2.check_logs()
finally:
user1.stop()
user2.stop()
if __name__ == "__main__":
# Start game server.
server = subprocess.Popen([
sys.executable,
os.path.join(os.path.dirname(__file__), "realtimechess.py"), "debug"
])
try:
asyncio.get_event_loop().run_until_complete(browser_test())
finally:
server.kill()
print("\n\nOK :-D")