-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
171 lines (138 loc) · 7.54 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import random
from time import sleep
from datetime import datetime, timedelta, time
from email_config import EmailConfig
# Format time as a human-readable string
def format_time(seconds) -> str:
seconds = int(seconds)
hours, remainder = divmod(seconds, 3600)
minutes, seconds = divmod(remainder, 60)
if hours > 0:
if minutes > 0 and seconds > 0:
return f"{hours}h {minutes}m {seconds}s"
elif minutes > 0:
return f"{hours}h {minutes}m"
elif seconds > 0:
return f"{hours}h {seconds}s"
else:
return f"{hours}h"
elif minutes > 0:
if seconds > 0:
return f"{minutes}m {seconds}s"
else:
return f"{minutes}m"
else:
return f"{seconds}s"
# Returns whether a given datetime object is in range (int) [offline_start, offline_end]
def is_within_offline_hours(current_time: datetime, offline_start: int, offline_end: int):
offline_start_sec = offline_start * 3600
offline_end_sec = offline_end * 3600
time_sec = current_time.hour * 3600 + current_time.minute * 60 + current_time.second
return (
(offline_start < offline_end and (offline_start_sec <= time_sec < offline_end_sec)) or
(offline_start > offline_end and (time_sec >= offline_start_sec or time_sec <= offline_end_sec)) or
(offline_start == offline_end) # Offline hours span full day i.e. always offline
)
# Returns the next email time to send for
def calculate_next_time(current_time: datetime, break_sec: int, offline_start: int, offline_end: int) -> datetime:
next_time = current_time + timedelta(seconds=break_sec)
break_time = next_time - current_time
if is_within_offline_hours(next_time, offline_start, offline_end):
print(f"> OFFLINE HOURS: "
f"{time(offline_start, 0, 0).strftime('%I:%M:%S %p')} - "
f"{time(offline_end, 0, 0).strftime('%I:%M:%S %p')}\n"
f"> Current Time: {current_time.strftime('%I:%M:%S %p')}\n"
f"> Next Time: {next_time.strftime('%I:%M:%S %p')}\n"
f"> break_time = {format_time(break_time.total_seconds())}\n")
offline_end_sec = offline_end * 3600
next_time_sec = next_time.hour * 3600 + next_time.minute * 60 + next_time.second
# Handle case when offline hours go past midnight goes into the next day
# Add 24h if the difference is negative (overflow into the next day)
sec_till_offline_end = offline_end_sec - next_time_sec
if sec_till_offline_end < 0:
sec_till_offline_end += 24 * 3600
# Update break time (and as a result, next_time)
break_time += timedelta(seconds=sec_till_offline_end)
print(f"> {next_time.strftime('%I:%M:%S %p')} is OFFLINE!\n"
f"> Add {format_time(sec_till_offline_end)} to get to {time(offline_end, 0, 0).strftime('%I:%M %p')}, "
f"total = {format_time(break_time.total_seconds())}\n"
f"> Add initial break {format_time((next_time - current_time).total_seconds())}"
f" = {format_time((next_time + break_time - current_time).total_seconds())}\n")
next_time += break_time
return next_time
# Send emails with a delay in the range [min_delay, max_delay] (minutes)
def spaced_interval_algo(econfig: EmailConfig) -> None:
try:
while True:
# Determine sleep time for next email
rand_break: int = random.randint(econfig.email_break_minutes_min * 60, econfig.email_break_minutes_max * 60)
current_time: datetime = datetime.now()
next_time: datetime = calculate_next_time(current_time,
rand_break,
econfig.offline_hr_start,
econfig.offline_hr_end)
if not is_within_offline_hours(current_time + timedelta(seconds=rand_break),
econfig.offline_hr_start,
econfig.offline_hr_end):
# Send email
econfig.send_email()
print(f"Sent #{econfig.num_emails_sent} to {econfig.to_email} at {current_time.strftime('%I:%M:%S %p')}")
# Sleep until next email
sleep_sec = (next_time - current_time).total_seconds()
print(f"Next email will be sent at {next_time.strftime('%I:%M:%S %p')} (in {format_time(sleep_sec)})\n")
sleep(sleep_sec)
except KeyboardInterrupt:
print("Email script terminated.")
# Send emails in short bursts (1-5 min apart) and take a longer break in range [break_min, break_max]
def burst_email_algo(econfig: EmailConfig) -> None:
try:
while True:
num_burst_emails = random.randint(econfig.burst_num_emails_min, econfig.burst_num_emails_max)
print(f"BURST ALGORITHM\n\nSending {num_burst_emails} emails in burst...\n")
for i in range(num_burst_emails):
# Calculate break time
burst_break_sec: int = random.randint(econfig.burst_min_freq_sec, econfig.burst_max_freq_sec)
current_time: datetime = datetime.now()
next_time: datetime = current_time + timedelta(seconds=burst_break_sec)
# Sending burst during offline hours, sleep until hours are finished
if is_within_offline_hours(next_time, econfig.offline_hr_start, econfig.offline_hr_end):
burst_resume_time = calculate_next_time(
current_time,
burst_break_sec,
econfig.offline_hr_start,
econfig.offline_hr_end)
pause_time = (burst_resume_time - current_time).total_seconds()
print(f"Scheduled to continue at {burst_resume_time.strftime('%I:%M:%S %p')} "
f"(in {format_time(pause_time)})\n")
sleep(pause_time)
# Send burst email
econfig.send_email()
# Update times
current_time = datetime.now()
next_time = current_time + timedelta(seconds=burst_break_sec)
# Print info
print(f"Sent #{econfig.num_emails_sent} "
f"to {econfig.to_email} "
f"at {current_time.strftime('%I:%M:%S %p')}\n"
f"Next email will be sent at {next_time.strftime('%I:%M:%S %p')} "
f"(in {format_time(burst_break_sec)})\n"
f"{i + 1}/{num_burst_emails} burst emails sent\n")
sleep(burst_break_sec)
# Take a longer break
rand_break = random.randint(econfig.burst_long_break_minutes_min * 60,
econfig.burst_long_break_minutes_max * 60)
current_time = datetime.now()
next_time = calculate_next_time(current_time, rand_break, econfig.offline_hr_start, econfig.offline_hr_end)
long_break_sec = (next_time - current_time).total_seconds()
# Print details for next email
print(f"Long break. Next burst at {next_time.strftime('%I:%M:%S %p')} (in {format_time(long_break_sec)})\n")
sleep(long_break_sec)
except KeyboardInterrupt:
print("Email script terminated.")
def main():
filepath = "config.json"
email_config_info = EmailConfig(filepath)
# spaced_interval_algo(email_config_info)
burst_email_algo(email_config_info)
if __name__ == '__main__':
main()