This repository has been archived by the owner on May 7, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 12
/
messageinfo.py
179 lines (138 loc) · 5.82 KB
/
messageinfo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#Modified by Joao Paulo Barraca <[email protected]>
import email
import email.utils
import email.header
import imaplib
import md5
import time
import re
InternalDate = re.compile(r'.*INTERNALDATE "'
r'(?P<day>[ 0123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])'
r' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])'
r' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])'
r'"')
class MessageInfo(object):
__oldestMessageSec = time.mktime([2027, 12, 31, 23, 59, 59, 0, 0, 0])
__newestMessageSec = time.mktime([1970, 1, 1, 0, 0, 0, 0, 0, 0])
__parseDates = True
__hasDate = False
_NAME_CACHE = {}
def __init__(self):
self.__message_id = None
self.__mailboxes = []
self.is_from_me = False
self.is_to_me = False
self.__parsed_name_address = {}
def PopulateField(self, name, value):
if name == "UID":
self.__uid = value
elif name == "RFC822.SIZE":
self.size = int(value)
elif name == "FLAGS":
self.__flags = value
elif name == "INTERNALDATE":
self.__date_string = value
self.__hasDate = True
self.__date_tuple = imaplib.Internaldate2tuple('INTERNALDATE "%s"' % value)
self.__date_sec = time.mktime(self.__date_tuple)
if self.__date_sec > MessageInfo.__newestMessageSec:
MessageInfo.__newestMessageSec = self.__date_sec
if self.__date_sec < MessageInfo.__oldestMessageSec:
MessageInfo.__oldestMessageSec = self.__date_sec
elif name == "RFC822.HEADER":
try:
unicode(value, errors='strict')
except:
value = unicode(value, errors='ignore')
self.headers = email.message_from_string(value)
else: raise AssertionError("unknown field: %s" % name)
def GetMessageId(self):
if not self.__message_id:
d = md5.new()
d.update(str(self.size))
d.update(self.__date_string)
self.__message_id = d.digest()
return self.__message_id
def AddMailbox(self, mailbox):
self.__mailboxes.append(mailbox)
def HasDate(self):
return self.__hasDate
def GetDate(self):
return self.__date_tuple
def GetSender(self):
return self._GetNameAddress("from")
def GetListId(self):
(name, address) = self._GetNameAddress("list-id")
# Don't use the name part of the list-id header, it tends to be overly
# descriptive (i.e. too long)
return address, address
def GetRecipients(self):
tos = self.GetHeaderAll('to')
ccs = self.GetHeaderAll('cc')
bccs = self.GetHeaderAll('bcc')
resent_tos = self.GetHeaderAll('resent-to')
resent_ccs = self.GetHeaderAll('resent-cc')
all_recipients = email.utils.getaddresses(tos + ccs + bccs+resent_tos + resent_ccs)
# Cleaned up and uniquefied
recipients_map = {}
for name, address in all_recipients:
if address:
name, address = self._GetCleanedUpNameAddress(name, address)
recipients_map[address] = name.replace("'", "")
return [(name, address) for address, name in recipients_map.items()]
def _GetNameAddress(self, header):
if header not in self.headers:
return None, None
if header not in self.__parsed_name_address:
header_value = self.GetHeader(header)
header_value = header_value.replace("\n", " ")
header_value = header_value.replace("\r", " ")
name, address = email.utils.parseaddr(header_value)
if address:
name, address = self._GetCleanedUpNameAddress(name, address)
self.__parsed_name_address[header] = name, address
return self.__parsed_name_address[header]
_PLUS_ADDRESS_RE = re.compile("\+.*@")
def GetHeader(self, name):
return self._GetDecodedValue(self.headers[name])
def GetHeaderAll(self, name):
values = self.headers.get_all(name, [])
return [self._GetDecodedValue(value) for value in values]
def _GetDecodedValue(self, value):
try:
pieces = email.header.decode_header(value)
unicode_pieces = [unicode(text, charset or "ascii") for text, charset in pieces]
return u"".join(unicode_pieces)
except LookupError:
# Ignore bogus encodings
return value
except UnicodeDecodeError:
# Ignore mis-encoded data
return value
def _GetCleanedUpNameAddress(self, name, address):
address = address.lower()
address = MessageInfo._PLUS_ADDRESS_RE.sub("@", address)
if name == "No Description Available":
name = None
cache = MessageInfo._NAME_CACHE
if address in cache:
if name:
cache[address][name] = cache[address].get(name, 0) + 1
# TODO(mihaip): cache max and compare with updated key and re-compute if
# necessary, instead of re-calculating every time
popular_name_pair = max(cache[address].items(), key=lambda pair: pair[1])
name = popular_name_pair[0]
elif name:
cache[address] = {}
cache[address][name] = 1
else:
name = address
return name, address
def GetDateRange():
return [MessageInfo.__oldestMessageSec, MessageInfo.__newestMessageSec]
GetDateRange = staticmethod(GetDateRange)
def SetParseDate(parseDates):
MessageInfo.__parseDates = parseDates
SetParseDate = staticmethod(SetParseDate)
def __str__(self):
return "%s (size: %d, date: %s)" % (self.GetHeader("subject"), self.size, self.__date_string)