-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.py
138 lines (114 loc) · 4.88 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import argparse
import configparser
import email
import imaplib
import logging
import os
import re
import time
from selenium import webdriver
logger = logging.getLogger(__name__)
def main():
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
"-c",
"--config-file",
dest="config_file",
default="config.ini",
help='Absolute or relative path to the configuration file. Must be an ".ini" format',
)
args = arg_parser.parse_args()
config_file = args.config_file
if not os.path.exists(config_file):
logger.error(f"Config file {config_file} not exists")
exit(1)
logger.info(f"Read config file {config_file}")
config_parser = configparser.ConfigParser()
config_parser.read(config_file)
for section in config_parser.sections():
logger.info(f"Process section {section}")
host = config_parser.get(section, "host")
port = config_parser.getint(section, "port", fallback=993)
user = config_parser.get(section, "user")
passw = config_parser.get(section, "pass")
mailbox = config_parser.get(section, "mailbox", fallback="INBOX")
auto_prune = config_parser.getboolean(section, "auto_prune", fallback=False)
trash_mailbox = config_parser.get(section, "trash_mailbox")
chromedriver_path = config_parser.get(section, "chromedriver_path")
headless = config_parser.get(section, "headless", fallback=False)
logger.info(f"Connect to IMAP server {host}:{port}")
imap = imaplib.IMAP4_SSL(host, port)
imap.login(user, passw)
logger.info(f"Select mailbox {mailbox} and search for dondino mails")
imap.select(mailbox)
_, messages = imap.search(None, '(FROM "[email protected]")')
ids = messages[0].split()
logger.info(f"Found {len(ids)} dondino mails")
links = {}
for id in ids:
_, data = imap.fetch(id, "(RFC822.TEXT)")
_, byte_data = data[0]
email_message = email.message_from_bytes(byte_data)
for email_part in email_message.walk():
content_type = email_part.get_content_type()
if content_type == "text/plain" or content_type == "text/html":
content_byte = email_part.get_payload(decode=True)
content = content_byte.decode()
link_matcher = re.search(r"https://dondino\.de/link/\?\w+", content)
if link_matcher:
link = link_matcher.group(0)
links[id.decode()] = link
logger.info(f"Found dondino paid link {link}")
else:
if auto_prune:
_mark_mail_as_deleted(imap, id)
if links:
logger.info(f"Found {len(links)} dondino paid links")
if len(ids) != len(links):
logger.info(
f"There are some dondino mails that may be “Bonus” or “Info” mails and do not contain paid links"
)
options = webdriver.ChromeOptions()
options.add_argument("--no-first-run")
options.add_argument("--no-default-browser-check")
options.add_argument("--disable-search-engine-choice-screen")
if headless:
options.add_argument("--headless")
service = None
if chromedriver_path:
service = webdriver.ChromeService(executable_path=chromedriver_path)
driver = webdriver.Chrome(options=options, service=service)
driver.get("https://dondino.de")
for id, link in links.items():
logger.info(f"Open dondino paid link {link}")
driver.execute_script(f"window.open('{link}');")
if auto_prune:
_mark_mail_as_deleted(imap, id)
logger.info(
"Wait 2 minutes before closing the browser to ensure that all dondino pages are fully loaded"
)
time.sleep(120)
driver.quit()
if auto_prune:
logger.info("Delete all found dondino mails")
if _is_gmail_host(imap):
imap.select(mailbox=trash_mailbox)
imap.store("1:*", "+FLAGS", r"(\Deleted)")
imap.expunge()
else:
logger.info(f"No dondino paid links found")
logger.info("Disconnect from IMAP server")
imap.close()
imap.logout()
def _mark_mail_as_deleted(imap, id):
if _is_gmail_host(imap):
imap.store(id, "+X-GM-LABELS", r"(\Trash)")
else:
imap.store(id, "+FLAGS", r"(\Deleted)")
def _is_gmail_host(imap):
return imap.host.endswith("gmail.com")
if __name__ == "__main__":
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO
)
main()