forked from MParvin/TR-CMD
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot.py
156 lines (132 loc) · 4.88 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import subprocess
import configparser
import os
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters
import logging
import ast
config = configparser.ConfigParser()
config.read("config")
### Get admin chat_id from config file
### For more security replies only send to admin chat_id
adminCID = config["SecretConfig"]["admincid"]
adminCID = ast.literal_eval(adminCID)
### Enable logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
### This function run command and send output to user
def runCMD(bot, update):
if not isAdmin(bot, update):
return
chat_id = update.message.chat_id
usercommand = update.message.text
cmdProc = subprocess.Popen(
usercommand, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
cmdOut, cmdErr = cmdProc.communicate()
if cmdOut:
chunk_size=4000
if len(cmdOut)>chunk_size:
cmdOut = [ cmdOut[i:i+chunk_size] for i in range(0, len(cmdOut), chunk_size) ]
for message in cmdOut:
bot.sendMessage(text=str(message, "utf-8"), chat_id=chat_id)
else:
bot.sendMessage(text=str(cmdOut, "utf-8"), chat_id=chat_id)
else:
chunk_size=4000
if len(cmdErr)>chunk_size:
cmdErr = [ cmdErr[i:i+chunk_size] for i in range(0, len(cmdErr), chunk_size) ]
for message in cmdErr:
bot.sendMessage(text=str(message, "utf-8"), chat_id=chat_id)
else:
bot.sendMessage(text=str(cmdErr, "utf-8"), chat_id=chat_id)
### This function ping 8.8.8.8 and send you result
def ping8(bot, update):
if not isAdmin(bot, update):
return
chat_id = update.message.chat_id
cmdOut = str(
subprocess.check_output(
"ping", "8.8.8.8 -c4", stderr=subprocess.STDOUT, shell=True
),
"utf-8",
)
bot.sendMessage(text=cmdOut, chat_id=chat_id)
def startCMD(bot, update):
if not isAdmin(bot, update):
return
chat_id = update.message.chat_id
bot.sendMessage(
text="Welcome to TSMB bot, this is Linux server/PC manager, Please use /help and read carefully!!",
chat_id=chat_id,
)
def helpCMD(bot, update):
if not isAdmin(bot, update):
return
chat_id = update.message.chat_id
bot.sendMessage(
text="This bot has access to your server/PC, So it can do anything. Please use Telegram local password to prevent others from accessing to this bot.",
chat_id=chat_id,
)
def topCMD(bot, update):
if not isAdmin(bot, update):
return
chat_id = update.message.chat_id
cmdOut = str(subprocess.check_output("top -n 1", shell=True), "utf-8")
bot.sendMessage(text=cmdOut, chat_id=chat_id)
def HTopCMD(bot, update):
## Is this user admin?
if not isAdmin(bot, update):
return
chat_id = update.message.chat_id
## Checking requirements on your system
htopCheck = subprocess.call(["which", "htop"])
if htopCheck != 0:
bot.sendMessage(
text="htop is not installed on your system, Please install it first and try again",
chat_id=chat_id,
)
return
ahaCheck = subprocess.call(["which", "aha"])
if ahaCheck != 0:
bot.sendMessage(
text="aha is not installed on your system, Please install it first and try again",
chat_id=chat_id,
)
return
os.system("echo q | htop | aha --black --line-fix > ./htop-output.html")
with open("./htop-output.html", "rb") as fileToSend:
bot.sendDocument(document=fileToSend, chat_id=chat_id)
if os.path.exists("./htop-output.html"):
os.remove("./htop-output.html")
def error(bot, update, error):
"""Log Errors caused by Updates."""
logger.warning('Update "%s" caused error "%s"', update, error)
def isAdmin(bot, update):
chat_id = update.message.chat_id
if int(chat_id) in adminCID:
return True
update.message.reply_text(
"You cannot use this bot, because you are not Admin!!!!"
)
alertMessage = """Some one try to use this bot with this information:\n chat_id is {} and username is {} """.format(
update.message.chat_id, update.message.from_user.username
)
for admin in adminCID:
bot.sendMessage(text=alertMessage, chat_id=admin)
return False
def main():
updater = Updater(config["SecretConfig"]["Token"])
dp = updater.dispatcher
dp.add_handler(CommandHandler("start", startCMD))
dp.add_handler(CommandHandler("ping8", ping8))
dp.add_handler(CommandHandler("top", topCMD))
dp.add_handler(CommandHandler("htop", HTopCMD))
dp.add_handler(CommandHandler("help", helpCMD))
dp.add_handler(MessageHandler(Filters.text, runCMD))
dp.add_error_handler(error)
updater.start_polling()
updater.idle()
if __name__ == "__main__":
main()