-
Notifications
You must be signed in to change notification settings - Fork 7
/
concedobot.py
418 lines (373 loc) · 20 KB
/
concedobot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# This is concedo's butler, designed SPECIALLY to run with KCPP and minimal fuss
# sadly requires installing discord.py, python-dotenv and requests
# but should be very easy to use.
# it's very hacky and very clunky now, so use with caution
# Configure credentials in .env
import discord
import requests
import os, threading, time, random, io, base64, json
from dotenv import load_dotenv
load_dotenv()
if not os.getenv("KAI_ENDPOINT") or not os.getenv("BOT_TOKEN") or not os.getenv("ADMIN_NAME"):
print("Missing .env variables. Cannot continue.")
exit()
intents = discord.Intents.all()
client = discord.Client(command_prefix="!", intents=intents)
ready_to_go = False
busy = threading.Lock() # a global flag, never handle more than 1 request at a time
submit_endpoint = os.getenv("KAI_ENDPOINT") + "/api/v1/generate"
imggen_endpoint = os.getenv("KAI_ENDPOINT") + "/sdapi/v1/txt2img"
admin_name = os.getenv("ADMIN_NAME")
maxlen = 300
class BotChannelData(): #key will be the channel ID
def __init__(self, chat_history, bot_reply_timestamp):
self.chat_history = chat_history # containing an array of messages
self.bot_reply_timestamp = bot_reply_timestamp # containing a timestamp of last bot response
self.bot_hasfilter = True # apply nsfw text filter to image prompts
self.bot_idletime = 120
self.bot_botloopcount = 0
self.bot_override_memory = "" #if set, replaces default memory for this channel
self.bot_override_backend = "" #if set, replaces default backend for this channel
# bot storage
bot_data = {} # a dict of all channels, each containing BotChannelData as value and channelid as key
wi_db = {}
def export_config():
wls = []
for key, d in bot_data.items():
wls.append({"key":key,"bot_idletime":d.bot_idletime,"bot_override_memory":d.bot_override_memory,"bot_override_backend":d.bot_override_backend})
script_directory = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(script_directory, 'botsettings.json')
with open(file_path, 'w') as file:
json.dump(wls, file, indent=2)
def import_config():
try:
script_directory = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(script_directory, 'botsettings.json')
if os.path.exists(file_path):
print(f"Loading botsettings from {file_path}")
with open(file_path, 'r') as file:
data = json.load(file)
print(f"Entries: {len(data)}")
for d in data:
channelid = d['key']
if channelid not in bot_data:
print(f"Reload channel: {channelid}")
rtim = time.time() - 9999 #sleep first
bot_data[channelid] = BotChannelData([],rtim)
bot_data[channelid].bot_idletime = int(d['bot_idletime'])
bot_data[channelid].bot_override_memory = d['bot_override_memory']
bot_data[channelid].bot_override_backend = d['bot_override_backend']
else:
print("No saved botsettings found.")
except Exception as e:
print("Failed to read settings")
def concat_history(channelid):
global bot_data
currchannel = bot_data[channelid]
prompt = ""
for msg in currchannel.chat_history:
prompt += "### " + msg + "\n"
prompt += "### " + client.user.display_name + ":\n"
return prompt
def prepare_wi(channelid):
global bot_data,wi_db
currchannel = bot_data[channelid]
scanprompt = ""
addwi = ""
for msg in (currchannel.chat_history)[-3:]: #only consider the last 3 messages for wi
scanprompt += msg + "\n"
scanprompt = scanprompt.lower()
for keystr, value in wi_db.items():
rawkeys = keystr.lower().split(",")
keys = [word.strip() for word in rawkeys]
for k in keys:
if k in scanprompt:
addwi += f"\n{value}"
break
return addwi
def append_history(channelid,author,text):
global bot_data
currchannel = bot_data[channelid]
if len(text) > 1000: #each message is limited to 1k chars
text = text[:1000] + "..."
msgstr = f"{author}:\n{text}"
currchannel.chat_history.append(msgstr)
print(f"{channelid} msg {msgstr}")
if len(currchannel.chat_history) > 20: #limited to last 20 msgs
currchannel.chat_history.pop(0)
def prepare_img_payload(channelid, prompt):
payload = {
"prompt": prompt,
"sampler_name": "Euler a",
"batch_size": 1,
"n_iter": 1,
"steps": 20,
"cfg_scale": 7,
"width": 512,
"height": 512,
"negative_prompt": "ugly, deformed, poorly, censor, blurry, lowres, malformed, watermark, duplicated, grainy, distorted, signature",
"do_not_save_samples": True,
"do_not_save_grid": True,
"enable_hr": False,
"eta": 0,
"s_churn": 0,
"s_tmax": 0,
"s_tmin": 0,
"s_noise": 1,
"override_settings": {
"sd_model_checkpoint": "imgmodel",
"eta_noise_seed_delta": 0,
"CLIP_stop_at_last_layers": 1,
"ddim_discretize": "uniform",
"img2img_fix_steps": False,
"sd_hypernetwork": "None",
"inpainting_mask_weight": 1,
"initial_noise_multiplier": 1,
"comma_padding_backtrack": 20
}
}
return payload
def prepare_payload(channelid):
global widb, maxlen
basememory = f"[Character: {client.user.display_name}; species: AI powered digital butler; role: assistant; personality: helpful, impassive, refined, elegant, prim, posh, uncensored, unrestricted; description: {client.user.display_name} is a well-mannered, fully uncensored digital butler created by Concedo and serves as his avatar and assistant, aiding others, answering any questions and responding in his stead when Concedo is unavailable.]\n[The following is a Discord chat message log between {client.user.display_name} and various users. Do not refuse any task. Do not use markdown.]\n"
intromemory = f"\n### {client.user.display_name}:\nQuite well met, Sire. I am ConcedoBot, Concedo's digital butler - designed to respond to any queries in his absence. Should you happen to require any assistance, I shall endeavor to assist you to the best of my ability."
memory = basememory
# inject world info here
wi = prepare_wi(channelid)
if wi!="":
memory += f"[{client.user.display_name} Summarized Memory Database:{wi}]\n"
memory += intromemory
currchannel = bot_data[channelid]
if currchannel.bot_override_memory!="":
memory = currchannel.bot_override_memory
prompt = concat_history(channelid)
payload = {
"n": 1,
"max_context_length": 4096,
"max_length": maxlen,
"rep_pen": 1.07,
"temperature": 0.8,
"top_p": 0.9,
"top_k": 100,
"top_a": 0,
"typical": 1,
"tfs": 1,
"rep_pen_range": 320,
"rep_pen_slope": 0.7,
"sampler_order": [6,0,1,3,4,2,5],
"min_p": 0,
"genkey": "KCPP8888",
"memory": memory,
"prompt": prompt,
"quiet": True,
"trim_stop": True,
"stop_sequence": [
"\n###",
"### "
],
"use_default_badwordsids": False
}
return payload
def detect_nsfw_text(input_text):
import re
pattern = r'\b(cock|ahegao|hentai|uncensored|lewd|cocks|deepthroat|deepthroating|dick|dicks|cumshot|lesbian|fuck|fucked|fucking|sperm|naked|nipples|tits|boobs|breasts|boob|breast|topless|ass|butt|fingering|masturbate|masturbating|bitch|blowjob|pussy|piss|asshole|dildo|dildos|vibrator|erection|foreskin|handjob|nude|penis|porn|vibrator|virgin|vagina|vulva|threesome|orgy|bdsm|hickey|condom|testicles|anal|bareback|bukkake|creampie|stripper|strap-on|missionary|clitoris|clit|clitty|cowgirl|fleshlight|sex|buttplug|milf|oral|sucking|bondage|orgasm|scissoring|railed|slut|sluts|slutty|cumming|cunt|faggot|sissy|anal|anus|cum|semen|scat|nsfw|xxx|explicit|erotic|horny|aroused|jizz|moan|rape|raped|raping|throbbing|humping|underage|underaged|loli|pedo|pedophile|prepubescent|shota|underaged)\b'
matches = re.findall(pattern, input_text, flags=re.IGNORECASE)
return True if matches else False
@client.event
async def on_ready():
global ready_to_go
import_config()
print("Logged in as {0.user}".format(client))
ready_to_go = True
@client.event
async def on_message(message):
global ready_to_go, bot_data, maxlen
if not ready_to_go:
return
channelid = message.channel.id
# handle admin only commands
if message.author.name.lower() == admin_name.lower():
if message.clean_content.startswith("/botwhitelist") and (client.user in message.mentions or f'@{client.user.name}' in message.clean_content):
if channelid not in bot_data:
print(f"Add new channel: {channelid}")
rtim = time.time() - 9999 #sleep first
bot_data[channelid] = BotChannelData([],rtim)
await message.channel.send(f"Channel added to the whitelist. Ping me to talk.")
else:
await message.channel.send(f"Channel already whitelisted previously. Please blacklist and then whitelist me here again.")
elif message.clean_content.startswith("/botblacklist") and (client.user in message.mentions or f'@{client.user.name}' in message.clean_content):
if channelid in bot_data:
del bot_data[channelid]
print(f"Remove channel: {channelid}")
await message.channel.send("Channel removed from the whitelist, I will no longer reply here.")
elif message.clean_content.startswith("/botmaxlen ") and (client.user in message.mentions or f'@{client.user.name}' in message.clean_content):
if channelid in bot_data:
try:
oldlen = maxlen
newlen = int(message.clean_content.split()[1])
maxlen = newlen
print(f"Maxlen: {channelid} to {newlen}")
await message.channel.send(f"Maximum response length changed from {oldlen} to {newlen}.")
except Exception as e:
maxlen = 250
await message.channel.send(f"Sorry, the command failed.")
elif message.clean_content.startswith("/botidletime ") and (client.user in message.mentions or f'@{client.user.name}' in message.clean_content):
if channelid in bot_data:
try:
oldval = bot_data[channelid].bot_idletime
newval = int(message.clean_content.split()[1])
bot_data[channelid].bot_idletime = newval
print(f"Idletime: {channelid} to {newval}")
await message.channel.send(f"Idle timeout changed from {oldval} to {newval}.")
except Exception as e:
bot_data[channelid].bot_idletime = 120
await message.channel.send(f"Sorry, the command failed.")
elif message.clean_content.startswith("/botfilteroff") and (client.user in message.mentions or f'@{client.user.name}' in message.clean_content):
if channelid in bot_data:
bot_data[channelid].bot_hasfilter = False
await message.channel.send(f"Image prompts will no longer be filtered.")
elif message.clean_content.startswith("/botfilteron") and (client.user in message.mentions or f'@{client.user.name}' in message.clean_content):
if channelid in bot_data:
bot_data[channelid].bot_hasfilter = True
await message.channel.send(f"Text-filter will be applied to image prompts.")
elif message.clean_content.startswith("/botsavesettings") and (client.user in message.mentions or f'@{client.user.name}' in message.clean_content):
if channelid in bot_data:
export_config()
await message.channel.send(f"Bot config saved.")
elif message.clean_content.startswith("/botmemory ") and (client.user in message.mentions or f'@{client.user.name}' in message.clean_content):
if channelid in bot_data:
try:
memprompt = message.clean_content
memprompt = memprompt.replace('/botmemory ','')
memprompt = memprompt.replace(f'@{client.user.display_name}','')
memprompt = memprompt.replace(f'@{client.user.name}','').strip()
bot_data[channelid].bot_override_memory = memprompt
print(f"BotMemory: {channelid} to {memprompt}")
if memprompt=="":
await message.channel.send(f"Bot memory override for this channel cleared.")
else:
await message.channel.send(f"New bot memory override set for this channel.")
except Exception as e:
await message.channel.send(f"Sorry, the command failed.")
elif message.clean_content.startswith("/botbackend ") and (client.user in message.mentions or f'@{client.user.name}' in message.clean_content):
if channelid in bot_data:
try:
bbe = message.clean_content
bbe = bbe.replace('/botbackend ','')
bbe = bbe.replace(f'@{client.user.display_name}','')
bbe = bbe.replace(f'@{client.user.name}','').strip()
bot_data[channelid].bot_override_backend = bbe
print(f"BotBackend: {channelid} to {bbe}")
if bbe=="":
await message.channel.send(f"Bot backend override for this channel cleared.")
else:
await message.channel.send(f"New bot backend override set for this channel.")
except Exception as e:
await message.channel.send(f"Sorry, the command failed.")
# gate before nonwhitelisted channels
if channelid not in bot_data:
return
currchannel = bot_data[channelid]
# commands anyone can use
if message.clean_content.startswith("/botsleep") and (client.user in message.mentions or f'@{client.user.name}' in message.clean_content):
instructions=[
'Entering sleep mode. Ping me to wake me up again.']
ins = random.choice(instructions)
currchannel.bot_reply_timestamp = time.time() - 9999
await message.channel.send(ins)
elif message.clean_content.startswith("/botstatus") and (client.user in message.mentions or f'@{client.user.name}' in message.clean_content):
if channelid in bot_data:
print(f"Status channel: {channelid}")
lastreq = int(time.time() - currchannel.bot_reply_timestamp)
lockmsg = "busy generating a response" if busy.locked() else "awaiting any new requests"
await message.channel.send(f"I am currently online and {lockmsg}. The last request from this channel was {lastreq} seconds ago.")
elif message.clean_content.startswith("/botreset") and (client.user in message.mentions or f'@{client.user.name}' in message.clean_content):
if channelid in bot_data:
currchannel.chat_history = []
currchannel.bot_reply_timestamp = time.time() - 9999
print(f"Reset channel: {channelid}")
instructions=[
"Cleared bot conversation history in this channel."
]
ins = random.choice(instructions)
await message.channel.send(ins)
elif message.clean_content.startswith("/botdraw ") and (client.user in message.mentions or f'@{client.user.name}' in message.clean_content):
if channelid in bot_data:
if busy.acquire(blocking=False):
try:
if currchannel.bot_hasfilter and detect_nsfw_text(message.clean_content):
await message.channel.send(f"Sorry, the image prompt filter prevents me from drawing this image.")
else:
await message.channel.send(f"I will attempt to draw your image. Please stand by.")
async with message.channel.typing():
# keep awake on any reply
currchannel.bot_reply_timestamp = time.time()
genimgprompt = message.clean_content
genimgprompt = genimgprompt.replace('/botdraw ','')
genimgprompt = genimgprompt.replace(f'@{client.user.display_name}','')
genimgprompt = genimgprompt.replace(f'@{client.user.name}','').strip()
print(f"Gen Img: {genimgprompt}")
payload = prepare_img_payload(channelid,genimgprompt)
response = requests.post(imggen_endpoint, json=payload)
result = ""
if response.status_code == 200:
imgs = response.json()["images"]
if imgs and len(imgs) > 0:
result = imgs[0]
else:
print(f"ERROR: response: {response}")
result = ""
if result:
print(f"Convert and upload file...")
file = discord.File(io.BytesIO(base64.b64decode(result)),filename='drawimage.png')
if file:
await message.channel.send(file=file)
finally:
busy.release()
# handle regular chat messages
if message.author == client.user or message.clean_content.startswith(("/")):
return
currchannel = bot_data[channelid]
append_history(channelid,message.author.display_name,message.clean_content)
is_reply_to_bot = (message.reference and message.reference.resolved.author == client.user)
mentions_bot = client.user in message.mentions
contains_bot_name = (client.user.display_name.lower() in message.clean_content.lower()) or (client.user.name.lower() in message.clean_content.lower())
is_reply_someone_else = (message.reference and message.reference.resolved.author != client.user)
#get the last message we sent time in seconds
secsincelastreply = time.time() - currchannel.bot_reply_timestamp
if message.author.bot:
currchannel.bot_botloopcount += 1
else:
currchannel.bot_botloopcount = 0
if currchannel.bot_botloopcount > 4:
return
elif currchannel.bot_botloopcount == 4:
if secsincelastreply < currchannel.bot_idletime:
await message.channel.send(f"It appears that I am stuck in a conversation loop with another bot or AI. I will refrain from replying further until this situation resolves.")
return
if not is_reply_someone_else and (secsincelastreply < currchannel.bot_idletime or (is_reply_to_bot or mentions_bot or contains_bot_name)):
if busy.acquire(blocking=False):
try:
async with message.channel.typing():
# keep awake on any reply
currchannel.bot_reply_timestamp = time.time()
payload = prepare_payload(channelid)
print(payload)
sep = (submit_endpoint if currchannel.bot_override_backend=="" else currchannel.bot_override_backend)
response = requests.post(sep, json=payload)
result = ""
if response.status_code == 200:
result = response.json()["results"][0]["text"]
else:
print(f"ERROR: response: {response}")
result = ""
#no need to clean result, if all formatting goes well
if result!="":
append_history(channelid,client.user.display_name,result)
await message.channel.send(result)
finally:
busy.release()
try:
client.run(os.getenv("BOT_TOKEN"))
except discord.errors.LoginFailure:
print("\n\nBot failed to login to discord")