-
Notifications
You must be signed in to change notification settings - Fork 9
/
cvealert.py
136 lines (107 loc) · 4.04 KB
/
cvealert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/python3
# ------------------ conf start -----------------------------
# 群机器人的webhook写这里,支持钉钉/企业微信,一行一个
# 钉钉需要设置自定义关键词为`CVE`或IP白名单
webhooks = """\
""".split("\n")
# 关键词写这里
keywords = [
# "windows",
]
# 消息通知模板(markdown)
# 模板可用的变量: $cveId, $description, $refs, $published, $sourceIdentifier
template = """\
**[$cveId](https://nvd.nist.gov/vuln/detail/$cveId)**
> $description
References:
> $refs
"""
# ------------------ conf end -------------------------------
from calendar import timegm
from string import Template
import requests
import time
import sys
import os
import re
headers = {
"Referer": "https://github.com/p7e4/CVE-Alert",
"User-Agent": "CVE-Alert"
}
keywordRegx = [re.compile(f"({i})", re.I) for i in keywords]
def getCve(lastRunTimestamp):
now = time.strftime("%Y-%m-%dT%X.000", time.gmtime(time.time()))
last = time.strftime("%Y-%m-%dT%X.000", time.gmtime(lastRunTimestamp))
url = f"https://services.nvd.nist.gov/rest/json/cves/2.0/?resultsPerPage=2000&pubStartDate={last}&pubEndDate={now}"
data = []
templateVars = ("cveId", "description", "refs", "published", "sourceIdentifier")
try:
r = requests.get(url, timeout=30, headers=headers)
vulnerabilities = r.json()["vulnerabilities"]
except:
print(f"[{r.status_code}] {r.url} {r.text}")
raise
for item in vulnerabilities:
refs = []
for ref in item["cve"]["references"]:
refs.append(f"[{ref['url']}]({ref['url']})")
refs = "\n".join(refs)
cveId = item["cve"]["id"]
description = item["cve"]["descriptions"][0]["value"].replace("\n\n", "\n").rstrip("\r\n")
published = time.strftime("%Y-%m-%d %H:%M", time.gmtime(timegm(time.strptime(item["cve"]["published"][:-4], "%Y-%m-%dT%X")) + 60*60*8))
sourceIdentifier = item["cve"]["sourceIdentifier"]
if keywordRegx:
khit = False
for i in keywordRegx:
if i.search(description):
khit = True
description = i.sub("**\\1**", description)
if not khit: continue
data.append(Template(template).safe_substitute({key: value for key, value in vars().items() if key in templateVars}))
return data
def main():
runtime = int(time.time())
recordFile = f"{sys.path[0]}/.cvealert"
if os.path.exists(recordFile):
with open(recordFile) as f:
sendMessages(getCve(int(f.read()) - 1))
else:
sendMessages(getCve(time.time() - 60 * 60 * 24)[:5])
with open(recordFile, "w") as f:
f.write(str(runtime))
def sendMessages(messages):
# 钉钉 https://open.dingtalk.com/document/group/custom-robot-access
# 企业微信 https://developer.work.weixin.qq.com/document/path/91770
fq = 0
s = requests.Session()
for message in messages[::-1]:
if len(message) > 4096:
message = message[:4092] + "..."
for webhook in webhooks:
if not webhook: continue
if webhook.startswith("https://oapi.dingtalk.com/robot/send?access_token="):
send = {
"msgtype": "markdown",
"markdown": {
"title":"有新的推送消息",
"text": message
}
}
elif webhook.startswith("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key="):
send = {
"msgtype": "markdown",
"markdown": {
"content": message
}
}
else:
exit(f"未知的通知地址: `{webhook}`")
r = s.post(webhook, json=send, timeout=20)
if r.json()["errcode"] != 0:
print(f"发送消息失败: `{webhook}` {r.text}")
print(f"message: {message}")
fq += 1
if fq % 20 == 0:
time.sleep(60)
if __name__ == '__main__':
main()