-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathidbot.py
171 lines (128 loc) · 4.63 KB
/
idbot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/local/bin/python3
# coding: utf-8
__author__ = "Benny <[email protected]>"
import logging
import os
import re
import traceback
from typing import Any, Union
from pyrogram import Client, filters, types, raw
from tgbot_ping import get_runtime
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(filename)s [%(levelname)s]: %(message)s')
PROXY = os.getenv("PROXY")
TOKEN = os.getenv("TOKEN")
APP_ID = os.getenv("APP_ID")
APP_HASH = os.getenv("APP_HASH")
# telegram DC map: https://docs.pyrogram.org/faq/what-are-the-ip-addresses-of-telegram-data-centers
DC_MAP = {
1: "Miami",
2: "Amsterdam",
3: "Miami",
4: "Amsterdam",
5: "Singapore"
}
def create_app():
_app = Client("idbot", APP_ID, APP_HASH, bot_token=TOKEN)
if PROXY:
_app.proxy = dict(
scheme="socks5",
hostname=PROXY.split(":")[0],
port=int(PROXY.split(":")[1])
)
return _app
app = create_app()
service_count = 0
def get_user_detail(user: "Union[types.User, types.Chat]") -> "str":
global service_count
service_count += 1
if user is None:
return "Can't get hidden forwards!"
return f"""
user name: `@{user.username} `
first name: `{user.first_name or user.title}`
last name: `{user.last_name}`
user id: `{user.id}`
is bot: {getattr(user, "is_bot", None)}
DC: {user.dc_id} {DC_MAP.get(user.dc_id, "")}
language code: {getattr(user, "language_code", None)}
phone number: {getattr(user, "phone_number", None)}
"""
def get_channel_detail(channel) -> "str":
global service_count
service_count += 1
return f"""
Channel/group detail(you can also forward message to see detail):
name: `@{channel.chats[0].username} `
title: `{channel.chats[0].title}`
id: `-100{channel.chats[0].id}`
"""
@app.on_message(filters.command(["start"]))
def start_handler(client: "Client", message: "types.Message"):
chat_id = message.chat.id
client.send_message(chat_id, "Welcome to Benny's ID bot.")
@app.on_message(filters.command(["help"]))
def help_handler(client: "Client", message: "types.Message"):
chat_id = message.chat.id
text = """Forward messages, send username, use /getme to get your account's detail.\n
Opensource at GitHub: https://github.com/tgbot-collection/IDBot
"""
client.send_message(chat_id, text)
@app.on_message(filters.command(["getme"]))
def getme_handler(client: "Client", message: "types.Message"):
me = get_user_detail(message.from_user)
message.reply_text(me, quote=True)
@app.on_message(filters.command(["ping"]))
def start_handler(client: "Client", message: "types.Message"):
logging.info("Pong!")
chat_id = message.chat.id
runtime = get_runtime("botsrunner_idbot_1")
global service_count
if getattr(message.chat, "username", None) == "BennyThink":
msg = f"{runtime}\n\nService count:{service_count}"
else:
msg = runtime
client.send_message(chat_id, msg)
@app.on_message(filters.command(["getgroup"]))
def getgroup_handler(client: "Client", message: "types.Message"):
me = get_user_detail(message.chat)
message.reply_text(me, quote=True)
@app.on_message(filters.text & filters.group)
def getgroup_compatibly_handler(client: "Client", message: "types.Message"):
text = message.text
if getattr(message.forward_from_chat, "type", None) == "channel" or not re.findall(r"^/getgroup@.*bot$", text):
logging.warning("this is from channel or non-command text")
return
logging.info("compatibly getgroup")
getgroup_handler(client, message)
@app.on_message(filters.forwarded & filters.private)
def forward_handler(client: "Client", message: "types.Message"):
fwd = message.forward_from or message.forward_from_chat
me = get_user_detail(fwd)
message.reply_text(me, quote=True)
def get_users(username):
user: "Union[types.User, Any]" = app.get_users(username)
return get_user_detail(user)
def get_channel(username):
peer: "Union[raw.base.InputChannel, Any]" = app.resolve_peer(username)
result = app.invoke(
raw.functions.channels.GetChannels(
id=[peer]
)
)
return get_channel_detail(result)
@app.on_message(filters.text & filters.private)
def private_handler(client: "Client", message: "types.Message"):
username = re.sub(r"@+|https://t.me/", "", message.text)
funcs = [get_users, get_channel]
text = ""
for func in funcs:
try:
text = func(username)
if text:
break
except Exception as e:
logging.error(traceback.format_exc())
text = e
message.reply_text(text, quote=True)
if __name__ == '__main__':
app.run()