Package logilab :: Package common :: Module umessage
[frames] | no frames]

Source Code for Module logilab.common.umessage

  1  # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. 
  2  # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr 
  3  # 
  4  # This file is part of logilab-common. 
  5  # 
  6  # logilab-common is free software: you can redistribute it and/or modify it under 
  7  # the terms of the GNU Lesser General Public License as published by the Free 
  8  # Software Foundation, either version 2.1 of the License, or (at your option) any 
  9  # later version. 
 10  # 
 11  # logilab-common is distributed in the hope that it will be useful, but WITHOUT 
 12  # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 
 13  # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 14  # details. 
 15  # 
 16  # You should have received a copy of the GNU Lesser General Public License along 
 17  # with logilab-common.  If not, see <http://www.gnu.org/licenses/>. 
 18  """Unicode email support (extends email from stdlib)""" 
 19   
 20  __docformat__ = "restructuredtext en" 
 21   
 22  import email 
 23  from encodings import search_function 
 24  import sys 
 25  from email.utils import parseaddr, parsedate 
 26  from email.header import decode_header 
 27   
 28  from datetime import datetime 
 29   
 30  from six import text_type, binary_type 
 31   
 32  try: 
 33      from mx.DateTime import DateTime 
 34  except ImportError: 
 35      DateTime = datetime 
 36   
 37  import logilab.common as lgc 
 38   
 39   
40 -def decode_QP(string):
41 parts = [] 42 for decoded, charset in decode_header(string): 43 if not charset : 44 charset = 'iso-8859-15' 45 # python 3 sometimes returns str and sometimes bytes. 46 # the 'official' fix is to use the new 'policy' APIs 47 # https://bugs.python.org/issue24797 48 # let's just handle this bug ourselves for now 49 if isinstance(decoded, binary_type): 50 decoded = decoded.decode(charset, 'replace') 51 assert isinstance(decoded, text_type) 52 parts.append(decoded) 53 54 if sys.version_info < (3, 3): 55 # decoding was non-RFC compliant wrt to whitespace handling 56 # see http://bugs.python.org/issue1079 57 return u' '.join(parts) 58 return u''.join(parts)
59
60 -def message_from_file(fd):
61 try: 62 return UMessage(email.message_from_file(fd)) 63 except email.errors.MessageParseError: 64 return ''
65
66 -def message_from_string(string):
67 try: 68 return UMessage(email.message_from_string(string)) 69 except email.errors.MessageParseError: 70 return ''
71
72 -class UMessage:
73 """Encapsulates an email.Message instance and returns only unicode objects. 74 """ 75
76 - def __init__(self, message):
77 self.message = message
78 79 # email.Message interface ################################################# 80
81 - def get(self, header, default=None):
82 value = self.message.get(header, default) 83 if value: 84 return decode_QP(value) 85 return value
86
87 - def __getitem__(self, header):
88 return self.get(header)
89
90 - def get_all(self, header, default=()):
91 return [decode_QP(val) for val in self.message.get_all(header, default) 92 if val is not None]
93
94 - def is_multipart(self):
95 return self.message.is_multipart()
96
97 - def get_boundary(self):
98 return self.message.get_boundary()
99
100 - def walk(self):
101 for part in self.message.walk(): 102 yield UMessage(part)
103
104 - def get_payload(self, index=None, decode=False):
105 message = self.message 106 if index is None: 107 payload = message.get_payload(index, decode) 108 if isinstance(payload, list): 109 return [UMessage(msg) for msg in payload] 110 if message.get_content_maintype() != 'text': 111 return payload 112 if isinstance(payload, text_type): 113 return payload 114 115 charset = message.get_content_charset() or 'iso-8859-1' 116 if search_function(charset) is None: 117 charset = 'iso-8859-1' 118 return text_type(payload or b'', charset, "replace") 119 else: 120 payload = UMessage(message.get_payload(index, decode)) 121 return payload
122
123 - def get_content_maintype(self):
124 return text_type(self.message.get_content_maintype())
125
126 - def get_content_type(self):
127 return text_type(self.message.get_content_type())
128
129 - def get_filename(self, failobj=None):
130 value = self.message.get_filename(failobj) 131 if value is failobj: 132 return value 133 try: 134 return text_type(value) 135 except UnicodeDecodeError: 136 return u'error decoding filename'
137 138 # other convenience methods ############################################### 139
140 - def headers(self):
141 """return an unicode string containing all the message's headers""" 142 values = [] 143 for header in self.message.keys(): 144 values.append(u'%s: %s' % (header, self.get(header))) 145 return '\n'.join(values)
146
147 - def multi_addrs(self, header):
148 """return a list of 2-uple (name, address) for the given address (which 149 is expected to be an header containing address such as from, to, cc...) 150 """ 151 persons = [] 152 for person in self.get_all(header, ()): 153 name, mail = parseaddr(person) 154 persons.append((name, mail)) 155 return persons
156
157 - def date(self, alternative_source=False, return_str=False):
158 """return a datetime object for the email's date or None if no date is 159 set or if it can't be parsed 160 """ 161 value = self.get('date') 162 if value is None and alternative_source: 163 unix_from = self.message.get_unixfrom() 164 if unix_from is not None: 165 try: 166 value = unix_from.split(" ", 2)[2] 167 except IndexError: 168 pass 169 if value is not None: 170 datetuple = parsedate(value) 171 if datetuple: 172 if lgc.USE_MX_DATETIME: 173 return DateTime(*datetuple[:6]) 174 return datetime(*datetuple[:6]) 175 elif not return_str: 176 return None 177 return value
178