-
Notifications
You must be signed in to change notification settings - Fork 0
/
obs_websocket.py
205 lines (183 loc) · 9.51 KB
/
obs_websocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import time
import os
import sys
from obswebsocket import obsws, requests
from rich import print
from dotenv import load_dotenv
# Just in case this file is loaded alone
load_dotenv(dotenv_path=".env.local")
# Taking the shape of https://github.com/DougDougGithub/ChatGodApp/blob/main/obs_websockets.py, with minor changes
class OBSWebsocketsManager:
client = None
def __init__(self):
# Connect to websockets
print(
f"Connecting to OBS Websockets on {os.getenv('OBS_WEBSOCKET_HOST')}:{os.getenv('OBS_WEBSOCKET_PORT')}..."
)
self.client = obsws(
os.getenv("OBS_WEBSOCKET_HOST"),
os.getenv("OBS_WEBSOCKET_PORT"),
os.getenv("OBS_WEBSOCKET_PASSWORD"),
)
try:
self.client.connect()
print(
f"[green]Connected to OBS Websockets! OBS version: {self.client.call(requests.GetVersion()).getObsVersion()}"
)
except:
print(
"[red]COULD NOT CONNECT TO OBS!\nDouble check that you have OBS open and that your websockets server is enabled in OBS."
)
# sys.exit()
self.client = None
def disconnect(self):
self.client.disconnect()
# Set the current scene
def set_scene(self, new_scene):
self.client.call(requests.SetCurrentProgramScene(sceneName=new_scene))
# Set the visibility of any source's filters
def set_filter_visibility(self, source_name, filter_name, filter_enabled=True):
self.client.call(
requests.SetSourceFilterEnabled(
sourceName=source_name,
filterName=filter_name,
filterEnabled=filter_enabled,
)
)
def get_filter_visibility(self, source_name, filter_name):
response = self.client.call(
requests.GetSourceFilter(
sourceName=source_name,
filterName=filter_name,
)
)
return response.datain["filterEnabled"]
# Set the visibility of any source
def set_source_visibility(self, scene_name, source_name, source_visible=True):
response = self.client.call(
requests.GetSceneItemId(sceneName=scene_name, sourceName=source_name)
)
myItemID = response.datain["sceneItemId"]
self.client.call(
requests.SetSceneItemEnabled(
sceneName=scene_name,
sceneItemId=myItemID,
sceneItemEnabled=source_visible,
)
)
def get_source_visibility(self, scene_name, source_name):
response = self.client.call(
requests.GetSceneItemId(sceneName=scene_name, sourceName=source_name)
)
myItemID = response.datain["sceneItemId"]
response2 = self.client.call(
requests.GetSceneItemEnabled(
sceneName=scene_name,
sceneItemId=myItemID,
)
)
return response2.datain["sceneItemEnabled"]
# Returns the current text of a text source
def get_text(self, source_name):
response = self.client.call(requests.GetInputSettings(inputName=source_name))
return response.datain["inputSettings"]["text"]
# Returns the text of a text source
def set_text(self, source_name, new_text):
self.client.call(
requests.SetInputSettings(
inputName=source_name, inputSettings={"text": new_text}
)
)
def get_source_transform(self, scene_name, source_name):
response = self.client.call(
requests.GetSceneItemId(sceneName=scene_name, sourceName=source_name)
)
myItemID = response.datain["sceneItemId"]
response = self.client.call(
requests.GetSceneItemTransform(sceneName=scene_name, sceneItemId=myItemID)
)
transform = {}
transform["positionX"] = response.datain["sceneItemTransform"]["positionX"]
transform["positionY"] = response.datain["sceneItemTransform"]["positionY"]
transform["scaleX"] = response.datain["sceneItemTransform"]["scaleX"]
transform["scaleY"] = response.datain["sceneItemTransform"]["scaleY"]
transform["rotation"] = response.datain["sceneItemTransform"]["rotation"]
transform["sourceWidth"] = response.datain["sceneItemTransform"][
"sourceWidth"
] # original width of the source
transform["sourceHeight"] = response.datain["sceneItemTransform"][
"sourceHeight"
] # original width of the source
transform["width"] = response.datain["sceneItemTransform"][
"width"
] # current width of the source after scaling, not including cropping. If the source has been flipped horizontally, this number will be negative.
transform["height"] = response.datain["sceneItemTransform"][
"height"
] # current height of the source after scaling, not including cropping. If the source has been flipped vertically, this number will be negative.
transform["cropLeft"] = response.datain["sceneItemTransform"][
"cropLeft"
] # the amount cropped off the *original source width*. This is NOT scaled, must multiply by scaleX to get current # of cropped pixels
transform["cropRight"] = response.datain["sceneItemTransform"][
"cropRight"
] # the amount cropped off the *original source width*. This is NOT scaled, must multiply by scaleX to get current # of cropped pixels
transform["cropTop"] = response.datain["sceneItemTransform"][
"cropTop"
] # the amount cropped off the *original source height*. This is NOT scaled, must multiply by scaleY to get current # of cropped pixels
transform["cropBottom"] = response.datain["sceneItemTransform"][
"cropBottom"
] # the amount cropped off the *original source height*. This is NOT scaled, must multiply by scaleY to get current # of cropped pixels
return transform
# The transform should be a dictionary containing any of the following keys with corresponding values
# positionX, positionY, scaleX, scaleY, rotation, width, height, sourceWidth, sourceHeight, cropTop, cropBottom, cropLeft, cropRight
# e.g. {"scaleX": 2, "scaleY": 2.5}
# Note: there are other transform settings, like alignment, etc, but these feel like the main useful ones.
# Use get_source_transform to see the full list
def set_source_transform(self, scene_name, source_name, new_transform):
response = self.client.call(
requests.GetSceneItemId(sceneName=scene_name, sourceName=source_name)
)
myItemID = response.datain["sceneItemId"]
self.client.call(
requests.SetSceneItemTransform(
sceneName=scene_name,
sceneItemId=myItemID,
sceneItemTransform=new_transform,
)
)
# Note: an input, like a text box, is a type of source. This will get *input-specific settings*, not the broader source settings like transform and scale
# For a text source, this will return settings like its font, color, etc
def get_input_settings(self, input_name):
return self.client.call(requests.GetInputSettings(inputName=input_name))
# Get list of all the input types
def get_input_kind_list(self):
return self.client.call(requests.GetInputKindList())
# Get list of all items in a certain scene
def get_scene_items(self, scene_name):
return self.client.call(requests.GetSceneItemList(sceneName=scene_name))
if __name__ == "__main__":
obswebsockets_manager = OBSWebsocketsManager()
# text = "Computer science combines the study of computation and information processing fundamentals with their application in the world around us. Computer scientists build fast, reliable, scalable and secure software systems to organize and analyze information."
# obswebsockets_manager.set_source_visibility("Game/Desktop", "somnia", True)
# obswebsockets_manager.set_text("somnia says", text)
# obswebsockets_manager.set_source_visibility("Game/Desktop", "somnia says", True)
# time.sleep(5)
# obswebsockets_manager.set_source_visibility("Game/Desktop", "somnia", False)
# obswebsockets_manager.set_source_visibility("Game/Desktop", "somnia says", False)
## Bezos Ad Break Testing
# obswebsockets_manager.set_source_visibility("Game/Desktop", "Bezos Time", True)
# obswebsockets_manager.set_source_visibility("Game/Desktop", "Bezos", True)
# time.sleep(0.5)
# obswebsockets_manager.set_source_visibility("Game/Desktop", "shaiaz", True)
# time.sleep(5)
# obswebsockets_manager.set_source_visibility("Game/Desktop", "shaiaz", False)
# time.sleep(0.5)
# obswebsockets_manager.set_source_visibility("Game/Desktop", "Bezos Time", False)
# obswebsockets_manager.set_source_visibility("Game/Desktop", "Bezos", False)
## Testing meme format
# obswebsockets_manager.set_scene("meme format")
# obswebsockets_manager.set_filter_visibility("Game/Desktop Clone", "Freeze", True)
# time.sleep(5)
# obswebsockets_manager.set_scene("Game/Desktop")
# obswebsockets_manager.set_filter_visibility("Game/Desktop Clone", "Freeze", False)
visbility = obswebsockets_manager.get_filter_visibility("bonjour", "rainbow")
obswebsockets_manager.set_filter_visibility("bonjour", "rainbow", not visbility)