-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathmain.py
197 lines (162 loc) · 6.58 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from os import path, environ, remove
from re import compile as compiles
from contextlib import suppress
from typing import Union
from json import dumps
from requests import post, delete
from pyrogram import Client, filters, idle
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton
if path.exists('config.env'):
from dotenv import load_dotenv
load_dotenv("config.env", override=True)
# One of Paste Service from @JusidamaBot in Telegram
try:
app = Client(
"Gist-Telegram-BOT",
bot_token = environ["BOT_TOKEN"],
api_id = int(environ["API_ID"]),
api_hash = environ["API_HASH"]
)
app.start()
except Exception as e:
print(e)
exit(1)
OWNER_ID = int(environ.get('OWNER_ID', 845077810))
sudo = filters.user()
sudo.add(OWNER_ID)
class Github_Gist: # Learn class for first time, PR if bad or want to improve
def __init__(self, title: str = "", description: str = "", is_secret: bool = False): # All Params For Custom, Maybe Later
self.api = "https://api.github.com/gists"
self.token = environ.get('GIST_TOKEN') # Github Token with Gist Create Permission
self.title = title
self.description = description
self.secret = is_secret
self.username = app.me.username
if not self.title:
self.title = f"Gist Paste by @{self.username}"
if not self.description:
self.description = f"Github Gist created by @{self.username} from Telegram"
def create(self, text: str) -> dict:
data = {"description": self.description, "public": self.secret, "files": {self.title: {"content": text}}}
headers = {"Authorization": f"token {self.token}", "Content-Type": "application/json"}
resp = post(self.api, headers=headers, data=dumps(data))
if resp.status_code not in [200, 201]:
raise ValueError("ERROR : Failed To Create Github Gist")
resp = resp.json()
if not resp.get('message'):
return {"url": resp.get('html_url'), "raw": list(resp.get('files').values())[0].get('raw_url').replace(' ', '%20')}
else:
raise ValueError(f"ERROR : {resp.get('message')}")
def delete(self, ids: str) -> str:
headers = {"Authorization": f"token {self.token}"}
resp = delete(f'{self.api}/{ids}', headers=headers)
if resp.ok:
return f"Success Delete Gist With ID {ids}"
else:
raise ValueError("ERROR : Failed To Delete Github Gist")
# Custom Filters, to support bot username
def command(command: Union[str, list], prefix: Union[str, list] = None, is_sudo: bool = False):
if not prefix:
prefix = ["/", "."]
username = app.me.username
if isinstance(command, list):
cmds = []
for i in command:
cmds.extend([i, f'{i}@{username}'])
command = filters.command(cmds, prefix)
else:
command = filters.command([command, f'{command}@{username}'], prefix)
if is_sudo:
command = command & sudo
return command
# Size Checker for Limit
def humanbytes(size):
"""Convert Bytes To Bytes So That Human Can Read It"""
if not isinstance(command, int):
try:
size = int(size)
except ValueError:
size = None
if not size:
return "0 B"
size = int(size)
power = 2**10
raised_to_pow = 0
dict_power_n = {0: "", 1: "K", 2: "M", 3: "G", 4: "T", 5: "P", 6: "E", 7: "Z", 8: "Y"}
while size > power:
size /= power
raised_to_pow += 1
try:
real_size = f"{str(round(size, 2))} {dict_power_n[raised_to_pow]}B"
except KeyError:
real_size = "Can't Define Real Size !"
return real_size
# Pattern if extension supported, PR if want to add more
pattern = compiles(r"^text/|json$|yaml$|xml$|toml$|x-sh$|x-shellscript$")
@app.on_message(command('start'))
async def start(_, message):
return await message.reply_text("Bot Works", quote=True)
@app.on_message(command('create'))
async def create(_, message):
reply = message.reply_to_message
target = str(message.command[0]).split("@", maxsplit=1)[0]
if not reply and len(message.command) < 2:
return await message.reply_text(f"**Reply To A Message With /{target} or with command**", quote=True)
msg = await message.reply_text("`Pasting to Github Gist...`", quote=True)
data = ''
limit = 1024 * 1024
if reply and reply.document:
if reply.document.file_size > limit:
return await msg.edit(f"**You can only paste files smaller than {humanbytes(limit)}.**")
if not pattern.search(reply.document.mime_type):
return await msg.edit("**Only text files can be pasted.**")
file = await reply.download()
try:
with open(file, "r") as text:
data = text.read()
remove(file)
except UnicodeDecodeError:
with suppress(FileNotFoundError):
remove(file)
return await msg.edit('`File Not Supported !`')
elif reply and (reply.text or reply.caption):
data = reply.text or reply.caption
elif not reply and len(message.command) >= 2:
data = message.text.split(None, 1)[1]
if message.from_user:
if message.from_user.username:
uname = f"@{message.from_user.username}"
else:
uname = f'[{message.from_user.first_name}](tg://user?id={message.from_user.id})'
else:
uname = message.sender_chat.title
try:
resp = Github_Gist().create(data)
url = resp.get("url")
raw = resp.get("raw")
except Exception as e:
await msg.edit(f"`{e}`")
return
if not url:
return await msg.edit("Text Too Short Or File Problems")
button = []
if raw is not None:
button.append([InlineKeyboardButton("Open Link", url=url), InlineKeyboardButton("Raw Link", url=raw)])
else:
button.append([InlineKeyboardButton("Open Link", url=url)])
button.append([InlineKeyboardButton("Share Link", url=f"https://telegram.me/share/url?url={url}")])
pasted = f"**Here's your Github Gist URL successfully pasted.\n\nPaste by {uname}**"
await msg.edit(pasted, reply_markup=InlineKeyboardMarkup(button))
@app.on_message(command('delete', is_sudo=True))
async def delete(_, message):
try:
ids = message.command[1]
except IndexError:
return await message.reply_text(f"**/{message.command[0]} GIST_ID**", quote=True)
try:
result = Github_Gist().delete(ids)
except Exception as e:
result = str(e)
return await message.reply_text(result, quote=True)
idle()
app.stop()