Remove obsolete stuffs
This commit is contained in:
166
bin/python/python-3.13/Lib/test/test_email/__init__.py
Normal file
166
bin/python/python-3.13/Lib/test/test_email/__init__.py
Normal file
@@ -0,0 +1,166 @@
|
||||
import os
|
||||
import unittest
|
||||
import collections
|
||||
import email
|
||||
from email.message import Message
|
||||
from email._policybase import compat32
|
||||
from test.support import load_package_tests
|
||||
from test.test_email import __file__ as landmark
|
||||
|
||||
# Load all tests in package
|
||||
def load_tests(*args):
|
||||
return load_package_tests(os.path.dirname(__file__), *args)
|
||||
|
||||
|
||||
# helper code used by a number of test modules.
|
||||
|
||||
def openfile(filename, *args, **kws):
|
||||
path = os.path.join(os.path.dirname(landmark), 'data', filename)
|
||||
return open(path, *args, **kws)
|
||||
|
||||
|
||||
# Base test class
|
||||
class TestEmailBase(unittest.TestCase):
|
||||
|
||||
maxDiff = None
|
||||
# Currently the default policy is compat32. By setting that as the default
|
||||
# here we make minimal changes in the test_email tests compared to their
|
||||
# pre-3.3 state.
|
||||
policy = compat32
|
||||
# Likewise, the default message object is Message.
|
||||
message = Message
|
||||
|
||||
def __init__(self, *args, **kw):
|
||||
super().__init__(*args, **kw)
|
||||
self.addTypeEqualityFunc(bytes, self.assertBytesEqual)
|
||||
|
||||
# Backward compatibility to minimize test_email test changes.
|
||||
ndiffAssertEqual = unittest.TestCase.assertEqual
|
||||
|
||||
def _msgobj(self, filename):
|
||||
with openfile(filename, encoding="utf-8") as fp:
|
||||
return email.message_from_file(fp, policy=self.policy)
|
||||
|
||||
def _str_msg(self, string, message=None, policy=None):
|
||||
if policy is None:
|
||||
policy = self.policy
|
||||
if message is None:
|
||||
message = self.message
|
||||
return email.message_from_string(string, message, policy=policy)
|
||||
|
||||
def _bytes_msg(self, bytestring, message=None, policy=None):
|
||||
if policy is None:
|
||||
policy = self.policy
|
||||
if message is None:
|
||||
message = self.message
|
||||
return email.message_from_bytes(bytestring, message, policy=policy)
|
||||
|
||||
def _make_message(self):
|
||||
return self.message(policy=self.policy)
|
||||
|
||||
def _bytes_repr(self, b):
|
||||
return [repr(x) for x in b.splitlines(keepends=True)]
|
||||
|
||||
def assertBytesEqual(self, first, second, msg):
|
||||
"""Our byte strings are really encoded strings; improve diff output"""
|
||||
self.assertEqual(self._bytes_repr(first), self._bytes_repr(second))
|
||||
|
||||
def assertDefectsEqual(self, actual, expected):
|
||||
self.assertEqual(len(actual), len(expected), actual)
|
||||
for i in range(len(actual)):
|
||||
self.assertIsInstance(actual[i], expected[i],
|
||||
'item {}'.format(i))
|
||||
|
||||
|
||||
def parameterize(cls):
|
||||
"""A test method parameterization class decorator.
|
||||
|
||||
Parameters are specified as the value of a class attribute that ends with
|
||||
the string '_params'. Call the portion before '_params' the prefix. Then
|
||||
a method to be parameterized must have the same prefix, the string
|
||||
'_as_', and an arbitrary suffix.
|
||||
|
||||
The value of the _params attribute may be either a dictionary or a list.
|
||||
The values in the dictionary and the elements of the list may either be
|
||||
single values, or a list. If single values, they are turned into single
|
||||
element tuples. However derived, the resulting sequence is passed via
|
||||
*args to the parameterized test function.
|
||||
|
||||
In a _params dictionary, the keys become part of the name of the generated
|
||||
tests. In a _params list, the values in the list are converted into a
|
||||
string by joining the string values of the elements of the tuple by '_' and
|
||||
converting any blanks into '_'s, and this become part of the name.
|
||||
The full name of a generated test is a 'test_' prefix, the portion of the
|
||||
test function name after the '_as_' separator, plus an '_', plus the name
|
||||
derived as explained above.
|
||||
|
||||
For example, if we have:
|
||||
|
||||
count_params = range(2)
|
||||
|
||||
def count_as_foo_arg(self, foo):
|
||||
self.assertEqual(foo+1, myfunc(foo))
|
||||
|
||||
we will get parameterized test methods named:
|
||||
test_foo_arg_0
|
||||
test_foo_arg_1
|
||||
test_foo_arg_2
|
||||
|
||||
Or we could have:
|
||||
|
||||
example_params = {'foo': ('bar', 1), 'bing': ('bang', 2)}
|
||||
|
||||
def example_as_myfunc_input(self, name, count):
|
||||
self.assertEqual(name+str(count), myfunc(name, count))
|
||||
|
||||
and get:
|
||||
test_myfunc_input_foo
|
||||
test_myfunc_input_bing
|
||||
|
||||
Note: if and only if the generated test name is a valid identifier can it
|
||||
be used to select the test individually from the unittest command line.
|
||||
|
||||
The values in the params dict can be a single value, a tuple, or a
|
||||
dict. If a single value of a tuple, it is passed to the test function
|
||||
as positional arguments. If a dict, it is a passed via **kw.
|
||||
|
||||
"""
|
||||
paramdicts = {}
|
||||
testers = collections.defaultdict(list)
|
||||
for name, attr in cls.__dict__.items():
|
||||
if name.endswith('_params'):
|
||||
if not hasattr(attr, 'keys'):
|
||||
d = {}
|
||||
for x in attr:
|
||||
if not hasattr(x, '__iter__'):
|
||||
x = (x,)
|
||||
n = '_'.join(str(v) for v in x).replace(' ', '_')
|
||||
d[n] = x
|
||||
attr = d
|
||||
paramdicts[name[:-7] + '_as_'] = attr
|
||||
if '_as_' in name:
|
||||
testers[name.split('_as_')[0] + '_as_'].append(name)
|
||||
testfuncs = {}
|
||||
for name in paramdicts:
|
||||
if name not in testers:
|
||||
raise ValueError("No tester found for {}".format(name))
|
||||
for name in testers:
|
||||
if name not in paramdicts:
|
||||
raise ValueError("No params found for {}".format(name))
|
||||
for name, attr in cls.__dict__.items():
|
||||
for paramsname, paramsdict in paramdicts.items():
|
||||
if name.startswith(paramsname):
|
||||
testnameroot = 'test_' + name[len(paramsname):]
|
||||
for paramname, params in paramsdict.items():
|
||||
if hasattr(params, 'keys'):
|
||||
test = (lambda self, name=name, params=params:
|
||||
getattr(self, name)(**params))
|
||||
else:
|
||||
test = (lambda self, name=name, params=params:
|
||||
getattr(self, name)(*params))
|
||||
testname = testnameroot + '_' + paramname
|
||||
test.__name__ = testname
|
||||
testfuncs[testname] = test
|
||||
for key, value in testfuncs.items():
|
||||
setattr(cls, key, value)
|
||||
return cls
|
||||
4
bin/python/python-3.13/Lib/test/test_email/__main__.py
Normal file
4
bin/python/python-3.13/Lib/test/test_email/__main__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from test.test_email import load_tests
|
||||
import unittest
|
||||
|
||||
unittest.main()
|
||||
@@ -0,0 +1,207 @@
|
||||
import unittest
|
||||
from email import _encoded_words as _ew
|
||||
from email import errors
|
||||
from test.test_email import TestEmailBase
|
||||
|
||||
|
||||
class TestDecodeQ(TestEmailBase):
|
||||
|
||||
def _test(self, source, ex_result, ex_defects=[]):
|
||||
result, defects = _ew.decode_q(source)
|
||||
self.assertEqual(result, ex_result)
|
||||
self.assertDefectsEqual(defects, ex_defects)
|
||||
|
||||
def test_no_encoded(self):
|
||||
self._test(b'foobar', b'foobar')
|
||||
|
||||
def test_spaces(self):
|
||||
self._test(b'foo=20bar=20', b'foo bar ')
|
||||
self._test(b'foo_bar_', b'foo bar ')
|
||||
|
||||
def test_run_of_encoded(self):
|
||||
self._test(b'foo=20=20=21=2Cbar', b'foo !,bar')
|
||||
|
||||
|
||||
class TestDecodeB(TestEmailBase):
|
||||
|
||||
def _test(self, source, ex_result, ex_defects=[]):
|
||||
result, defects = _ew.decode_b(source)
|
||||
self.assertEqual(result, ex_result)
|
||||
self.assertDefectsEqual(defects, ex_defects)
|
||||
|
||||
def test_simple(self):
|
||||
self._test(b'Zm9v', b'foo')
|
||||
|
||||
def test_missing_padding(self):
|
||||
# 1 missing padding character
|
||||
self._test(b'dmk', b'vi', [errors.InvalidBase64PaddingDefect])
|
||||
# 2 missing padding characters
|
||||
self._test(b'dg', b'v', [errors.InvalidBase64PaddingDefect])
|
||||
|
||||
def test_invalid_character(self):
|
||||
self._test(b'dm\x01k===', b'vi', [errors.InvalidBase64CharactersDefect])
|
||||
|
||||
def test_invalid_character_and_bad_padding(self):
|
||||
self._test(b'dm\x01k', b'vi', [errors.InvalidBase64CharactersDefect,
|
||||
errors.InvalidBase64PaddingDefect])
|
||||
|
||||
def test_invalid_length(self):
|
||||
self._test(b'abcde', b'abcde', [errors.InvalidBase64LengthDefect])
|
||||
|
||||
|
||||
class TestDecode(TestEmailBase):
|
||||
|
||||
def test_wrong_format_input_raises(self):
|
||||
with self.assertRaises(ValueError):
|
||||
_ew.decode('=?badone?=')
|
||||
with self.assertRaises(ValueError):
|
||||
_ew.decode('=?')
|
||||
with self.assertRaises(ValueError):
|
||||
_ew.decode('')
|
||||
with self.assertRaises(KeyError):
|
||||
_ew.decode('=?utf-8?X?somevalue?=')
|
||||
|
||||
def _test(self, source, result, charset='us-ascii', lang='', defects=[]):
|
||||
res, char, l, d = _ew.decode(source)
|
||||
self.assertEqual(res, result)
|
||||
self.assertEqual(char, charset)
|
||||
self.assertEqual(l, lang)
|
||||
self.assertDefectsEqual(d, defects)
|
||||
|
||||
def test_simple_q(self):
|
||||
self._test('=?us-ascii?q?foo?=', 'foo')
|
||||
|
||||
def test_simple_b(self):
|
||||
self._test('=?us-ascii?b?dmk=?=', 'vi')
|
||||
|
||||
def test_q_case_ignored(self):
|
||||
self._test('=?us-ascii?Q?foo?=', 'foo')
|
||||
|
||||
def test_b_case_ignored(self):
|
||||
self._test('=?us-ascii?B?dmk=?=', 'vi')
|
||||
|
||||
def test_non_trivial_q(self):
|
||||
self._test('=?latin-1?q?=20F=fcr=20Elise=20?=', ' Für Elise ', 'latin-1')
|
||||
|
||||
def test_q_escaped_bytes_preserved(self):
|
||||
self._test(b'=?us-ascii?q?=20\xACfoo?='.decode('us-ascii',
|
||||
'surrogateescape'),
|
||||
' \uDCACfoo',
|
||||
defects = [errors.UndecodableBytesDefect])
|
||||
|
||||
def test_b_undecodable_bytes_ignored_with_defect(self):
|
||||
self._test(b'=?us-ascii?b?dm\xACk?='.decode('us-ascii',
|
||||
'surrogateescape'),
|
||||
'vi',
|
||||
defects = [
|
||||
errors.InvalidBase64CharactersDefect,
|
||||
errors.InvalidBase64PaddingDefect])
|
||||
|
||||
def test_b_invalid_bytes_ignored_with_defect(self):
|
||||
self._test('=?us-ascii?b?dm\x01k===?=',
|
||||
'vi',
|
||||
defects = [errors.InvalidBase64CharactersDefect])
|
||||
|
||||
def test_b_invalid_bytes_incorrect_padding(self):
|
||||
self._test('=?us-ascii?b?dm\x01k?=',
|
||||
'vi',
|
||||
defects = [
|
||||
errors.InvalidBase64CharactersDefect,
|
||||
errors.InvalidBase64PaddingDefect])
|
||||
|
||||
def test_b_padding_defect(self):
|
||||
self._test('=?us-ascii?b?dmk?=',
|
||||
'vi',
|
||||
defects = [errors.InvalidBase64PaddingDefect])
|
||||
|
||||
def test_nonnull_lang(self):
|
||||
self._test('=?us-ascii*jive?q?test?=', 'test', lang='jive')
|
||||
|
||||
def test_unknown_8bit_charset(self):
|
||||
self._test('=?unknown-8bit?q?foo=ACbar?=',
|
||||
b'foo\xacbar'.decode('ascii', 'surrogateescape'),
|
||||
charset = 'unknown-8bit',
|
||||
defects = [])
|
||||
|
||||
def test_unknown_charset(self):
|
||||
self._test('=?foobar?q?foo=ACbar?=',
|
||||
b'foo\xacbar'.decode('ascii', 'surrogateescape'),
|
||||
charset = 'foobar',
|
||||
# XXX Should this be a new Defect instead?
|
||||
defects = [errors.CharsetError])
|
||||
|
||||
def test_invalid_character_in_charset(self):
|
||||
self._test('=?utf-8\udce2\udc80\udc9d?q?foo=ACbar?=',
|
||||
b'foo\xacbar'.decode('ascii', 'surrogateescape'),
|
||||
charset = 'utf-8\udce2\udc80\udc9d',
|
||||
# XXX Should this be a new Defect instead?
|
||||
defects = [errors.CharsetError])
|
||||
|
||||
def test_q_nonascii(self):
|
||||
self._test('=?utf-8?q?=C3=89ric?=',
|
||||
'Éric',
|
||||
charset='utf-8')
|
||||
|
||||
|
||||
class TestEncodeQ(TestEmailBase):
|
||||
|
||||
def _test(self, src, expected):
|
||||
self.assertEqual(_ew.encode_q(src), expected)
|
||||
|
||||
def test_all_safe(self):
|
||||
self._test(b'foobar', 'foobar')
|
||||
|
||||
def test_spaces(self):
|
||||
self._test(b'foo bar ', 'foo_bar_')
|
||||
|
||||
def test_run_of_encodables(self):
|
||||
self._test(b'foo ,,bar', 'foo__=2C=2Cbar')
|
||||
|
||||
|
||||
class TestEncodeB(TestEmailBase):
|
||||
|
||||
def test_simple(self):
|
||||
self.assertEqual(_ew.encode_b(b'foo'), 'Zm9v')
|
||||
|
||||
def test_padding(self):
|
||||
self.assertEqual(_ew.encode_b(b'vi'), 'dmk=')
|
||||
|
||||
|
||||
class TestEncode(TestEmailBase):
|
||||
|
||||
def test_q(self):
|
||||
self.assertEqual(_ew.encode('foo', 'utf-8', 'q'), '=?utf-8?q?foo?=')
|
||||
|
||||
def test_b(self):
|
||||
self.assertEqual(_ew.encode('foo', 'utf-8', 'b'), '=?utf-8?b?Zm9v?=')
|
||||
|
||||
def test_auto_q(self):
|
||||
self.assertEqual(_ew.encode('foo', 'utf-8'), '=?utf-8?q?foo?=')
|
||||
|
||||
def test_auto_q_if_short_mostly_safe(self):
|
||||
self.assertEqual(_ew.encode('vi.', 'utf-8'), '=?utf-8?q?vi=2E?=')
|
||||
|
||||
def test_auto_b_if_enough_unsafe(self):
|
||||
self.assertEqual(_ew.encode('.....', 'utf-8'), '=?utf-8?b?Li4uLi4=?=')
|
||||
|
||||
def test_auto_b_if_long_unsafe(self):
|
||||
self.assertEqual(_ew.encode('vi.vi.vi.vi.vi.', 'utf-8'),
|
||||
'=?utf-8?b?dmkudmkudmkudmkudmku?=')
|
||||
|
||||
def test_auto_q_if_long_mostly_safe(self):
|
||||
self.assertEqual(_ew.encode('vi vi vi.vi ', 'utf-8'),
|
||||
'=?utf-8?q?vi_vi_vi=2Evi_?=')
|
||||
|
||||
def test_utf8_default(self):
|
||||
self.assertEqual(_ew.encode('foo'), '=?utf-8?q?foo?=')
|
||||
|
||||
def test_lang(self):
|
||||
self.assertEqual(_ew.encode('foo', lang='jive'), '=?utf-8*jive?q?foo?=')
|
||||
|
||||
def test_unknown_8bit(self):
|
||||
self.assertEqual(_ew.encode('foo\uDCACbar', charset='unknown-8bit'),
|
||||
'=?unknown-8bit?q?foo=ACbar?=')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,81 @@
|
||||
# Copyright (C) 2002-2006 Python Software Foundation
|
||||
# Contact: email-sig@python.org
|
||||
# email package unit tests for (optional) Asian codecs
|
||||
|
||||
import unittest
|
||||
|
||||
from test.test_email import TestEmailBase
|
||||
from email.charset import Charset
|
||||
from email.header import Header, decode_header
|
||||
from email.message import Message
|
||||
|
||||
# We're compatible with Python 2.3, but it doesn't have the built-in Asian
|
||||
# codecs, so we have to skip all these tests.
|
||||
try:
|
||||
str(b'foo', 'euc-jp')
|
||||
except LookupError:
|
||||
raise unittest.SkipTest
|
||||
|
||||
|
||||
|
||||
class TestEmailAsianCodecs(TestEmailBase):
|
||||
def test_japanese_codecs(self):
|
||||
eq = self.ndiffAssertEqual
|
||||
jcode = "euc-jp"
|
||||
gcode = "iso-8859-1"
|
||||
j = Charset(jcode)
|
||||
g = Charset(gcode)
|
||||
h = Header("Hello World!")
|
||||
jhello = str(b'\xa5\xcf\xa5\xed\xa1\xbc\xa5\xef\xa1\xbc'
|
||||
b'\xa5\xeb\xa5\xc9\xa1\xaa', jcode)
|
||||
ghello = str(b'Gr\xfc\xdf Gott!', gcode)
|
||||
h.append(jhello, j)
|
||||
h.append(ghello, g)
|
||||
# BAW: This used to -- and maybe should -- fold the two iso-8859-1
|
||||
# chunks into a single encoded word. However it doesn't violate the
|
||||
# standard to have them as two encoded chunks and maybe it's
|
||||
# reasonable <wink> for each .append() call to result in a separate
|
||||
# encoded word.
|
||||
eq(h.encode(), """\
|
||||
Hello World! =?iso-2022-jp?b?GyRCJU8lbSE8JW8hPCVrJUkhKhsoQg==?=
|
||||
=?iso-8859-1?q?Gr=FC=DF_Gott!?=""")
|
||||
eq(decode_header(h.encode()),
|
||||
[(b'Hello World! ', None),
|
||||
(b'\x1b$B%O%m!<%o!<%k%I!*\x1b(B', 'iso-2022-jp'),
|
||||
(b'Gr\xfc\xdf Gott!', gcode)])
|
||||
subject_bytes = (b'test-ja \xa4\xd8\xc5\xea\xb9\xc6\xa4\xb5'
|
||||
b'\xa4\xec\xa4\xbf\xa5\xe1\xa1\xbc\xa5\xeb\xa4\xcf\xbb\xca\xb2'
|
||||
b'\xf1\xbc\xd4\xa4\xce\xbe\xb5\xc7\xa7\xa4\xf2\xc2\xd4\xa4\xc3'
|
||||
b'\xa4\xc6\xa4\xa4\xa4\xde\xa4\xb9')
|
||||
subject = str(subject_bytes, jcode)
|
||||
h = Header(subject, j, header_name="Subject")
|
||||
# test a very long header
|
||||
enc = h.encode()
|
||||
# TK: splitting point may differ by codec design and/or Header encoding
|
||||
eq(enc , """\
|
||||
=?iso-2022-jp?b?dGVzdC1qYSAbJEIkWEVqOUYkNSRsJD8lYSE8JWskTztKGyhC?=
|
||||
=?iso-2022-jp?b?GyRCMnE8VCROPjVHJyRyQlQkQyRGJCQkXiQ5GyhC?=""")
|
||||
# TK: full decode comparison
|
||||
eq(str(h).encode(jcode), subject_bytes)
|
||||
|
||||
def test_payload_encoding_utf8(self):
|
||||
jhello = str(b'\xa5\xcf\xa5\xed\xa1\xbc\xa5\xef\xa1\xbc'
|
||||
b'\xa5\xeb\xa5\xc9\xa1\xaa', 'euc-jp')
|
||||
msg = Message()
|
||||
msg.set_payload(jhello, 'utf-8')
|
||||
ustr = msg.get_payload(decode=True).decode(msg.get_content_charset())
|
||||
self.assertEqual(jhello, ustr)
|
||||
|
||||
def test_payload_encoding(self):
|
||||
jcode = 'euc-jp'
|
||||
jhello = str(b'\xa5\xcf\xa5\xed\xa1\xbc\xa5\xef\xa1\xbc'
|
||||
b'\xa5\xeb\xa5\xc9\xa1\xaa', jcode)
|
||||
msg = Message()
|
||||
msg.set_payload(jhello, jcode)
|
||||
ustr = msg.get_payload(decode=True).decode(msg.get_content_charset())
|
||||
self.assertEqual(jhello, ustr)
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -0,0 +1,836 @@
|
||||
import unittest
|
||||
from test.test_email import TestEmailBase, parameterize
|
||||
import textwrap
|
||||
from email import policy
|
||||
from email.message import EmailMessage
|
||||
from email.contentmanager import ContentManager, raw_data_manager
|
||||
|
||||
|
||||
@parameterize
|
||||
class TestContentManager(TestEmailBase):
|
||||
|
||||
policy = policy.default
|
||||
message = EmailMessage
|
||||
|
||||
get_key_params = {
|
||||
'full_type': (1, 'text/plain',),
|
||||
'maintype_only': (2, 'text',),
|
||||
'null_key': (3, '',),
|
||||
}
|
||||
|
||||
def get_key_as_get_content_key(self, order, key):
|
||||
def foo_getter(msg, foo=None):
|
||||
bar = msg['X-Bar-Header']
|
||||
return foo, bar
|
||||
cm = ContentManager()
|
||||
cm.add_get_handler(key, foo_getter)
|
||||
m = self._make_message()
|
||||
m['Content-Type'] = 'text/plain'
|
||||
m['X-Bar-Header'] = 'foo'
|
||||
self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo'))
|
||||
|
||||
def get_key_as_get_content_key_order(self, order, key):
|
||||
def bar_getter(msg):
|
||||
return msg['X-Bar-Header']
|
||||
def foo_getter(msg):
|
||||
return msg['X-Foo-Header']
|
||||
cm = ContentManager()
|
||||
cm.add_get_handler(key, foo_getter)
|
||||
for precedence, key in self.get_key_params.values():
|
||||
if precedence > order:
|
||||
cm.add_get_handler(key, bar_getter)
|
||||
m = self._make_message()
|
||||
m['Content-Type'] = 'text/plain'
|
||||
m['X-Bar-Header'] = 'bar'
|
||||
m['X-Foo-Header'] = 'foo'
|
||||
self.assertEqual(cm.get_content(m), ('foo'))
|
||||
|
||||
def test_get_content_raises_if_unknown_mimetype_and_no_default(self):
|
||||
cm = ContentManager()
|
||||
m = self._make_message()
|
||||
m['Content-Type'] = 'text/plain'
|
||||
with self.assertRaisesRegex(KeyError, 'text/plain'):
|
||||
cm.get_content(m)
|
||||
|
||||
class BaseThing(str):
|
||||
pass
|
||||
baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing'
|
||||
class Thing(BaseThing):
|
||||
pass
|
||||
testobject_full_path = __name__ + '.' + 'TestContentManager.Thing'
|
||||
|
||||
set_key_params = {
|
||||
'type': (0, Thing,),
|
||||
'full_path': (1, testobject_full_path,),
|
||||
'qualname': (2, 'TestContentManager.Thing',),
|
||||
'name': (3, 'Thing',),
|
||||
'base_type': (4, BaseThing,),
|
||||
'base_full_path': (5, baseobject_full_path,),
|
||||
'base_qualname': (6, 'TestContentManager.BaseThing',),
|
||||
'base_name': (7, 'BaseThing',),
|
||||
'str_type': (8, str,),
|
||||
'str_full_path': (9, 'builtins.str',),
|
||||
'str_name': (10, 'str',), # str name and qualname are the same
|
||||
'null_key': (11, None,),
|
||||
}
|
||||
|
||||
def set_key_as_set_content_key(self, order, key):
|
||||
def foo_setter(msg, obj, foo=None):
|
||||
msg['X-Foo-Header'] = foo
|
||||
msg.set_payload(obj)
|
||||
cm = ContentManager()
|
||||
cm.add_set_handler(key, foo_setter)
|
||||
m = self._make_message()
|
||||
msg_obj = self.Thing()
|
||||
cm.set_content(m, msg_obj, foo='bar')
|
||||
self.assertEqual(m['X-Foo-Header'], 'bar')
|
||||
self.assertEqual(m.get_payload(), msg_obj)
|
||||
|
||||
def set_key_as_set_content_key_order(self, order, key):
|
||||
def foo_setter(msg, obj):
|
||||
msg['X-FooBar-Header'] = 'foo'
|
||||
msg.set_payload(obj)
|
||||
def bar_setter(msg, obj):
|
||||
msg['X-FooBar-Header'] = 'bar'
|
||||
cm = ContentManager()
|
||||
cm.add_set_handler(key, foo_setter)
|
||||
for precedence, key in self.get_key_params.values():
|
||||
if precedence > order:
|
||||
cm.add_set_handler(key, bar_setter)
|
||||
m = self._make_message()
|
||||
msg_obj = self.Thing()
|
||||
cm.set_content(m, msg_obj)
|
||||
self.assertEqual(m['X-FooBar-Header'], 'foo')
|
||||
self.assertEqual(m.get_payload(), msg_obj)
|
||||
|
||||
def test_set_content_raises_if_unknown_type_and_no_default(self):
|
||||
cm = ContentManager()
|
||||
m = self._make_message()
|
||||
msg_obj = self.Thing()
|
||||
with self.assertRaisesRegex(KeyError, self.testobject_full_path):
|
||||
cm.set_content(m, msg_obj)
|
||||
|
||||
def test_set_content_raises_if_called_on_multipart(self):
|
||||
cm = ContentManager()
|
||||
m = self._make_message()
|
||||
m['Content-Type'] = 'multipart/foo'
|
||||
with self.assertRaises(TypeError):
|
||||
cm.set_content(m, 'test')
|
||||
|
||||
def test_set_content_calls_clear_content(self):
|
||||
m = self._make_message()
|
||||
m['Content-Foo'] = 'bar'
|
||||
m['Content-Type'] = 'text/html'
|
||||
m['To'] = 'test'
|
||||
m.set_payload('abc')
|
||||
cm = ContentManager()
|
||||
cm.add_set_handler(str, lambda *args, **kw: None)
|
||||
m.set_content('xyz', content_manager=cm)
|
||||
self.assertIsNone(m['Content-Foo'])
|
||||
self.assertIsNone(m['Content-Type'])
|
||||
self.assertEqual(m['To'], 'test')
|
||||
self.assertIsNone(m.get_payload())
|
||||
|
||||
|
||||
@parameterize
|
||||
class TestRawDataManager(TestEmailBase):
|
||||
# Note: these tests are dependent on the order in which headers are added
|
||||
# to the message objects by the code. There's no defined ordering in
|
||||
# RFC5322/MIME, so this makes the tests more fragile than the standards
|
||||
# require. However, if the header order changes it is best to understand
|
||||
# *why*, and make sure it isn't a subtle bug in whatever change was
|
||||
# applied.
|
||||
|
||||
policy = policy.default.clone(max_line_length=60,
|
||||
content_manager=raw_data_manager)
|
||||
message = EmailMessage
|
||||
|
||||
def test_get_text_plain(self):
|
||||
m = self._str_msg(textwrap.dedent("""\
|
||||
Content-Type: text/plain
|
||||
|
||||
Basic text.
|
||||
"""))
|
||||
self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n")
|
||||
|
||||
def test_get_text_html(self):
|
||||
m = self._str_msg(textwrap.dedent("""\
|
||||
Content-Type: text/html
|
||||
|
||||
<p>Basic text.</p>
|
||||
"""))
|
||||
self.assertEqual(raw_data_manager.get_content(m),
|
||||
"<p>Basic text.</p>\n")
|
||||
|
||||
def test_get_text_plain_latin1(self):
|
||||
m = self._bytes_msg(textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset=latin1
|
||||
|
||||
Basìc tëxt.
|
||||
""").encode('latin1'))
|
||||
self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
|
||||
|
||||
def test_get_text_plain_latin1_quoted_printable(self):
|
||||
m = self._str_msg(textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="latin-1"
|
||||
Content-Transfer-Encoding: quoted-printable
|
||||
|
||||
Bas=ECc t=EBxt.
|
||||
"""))
|
||||
self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
|
||||
|
||||
def test_get_text_plain_utf8_base64(self):
|
||||
m = self._str_msg(textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf8"
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
QmFzw6xjIHTDq3h0Lgo=
|
||||
"""))
|
||||
self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
|
||||
|
||||
def test_get_text_plain_bad_utf8_quoted_printable(self):
|
||||
m = self._str_msg(textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf8"
|
||||
Content-Transfer-Encoding: quoted-printable
|
||||
|
||||
Bas=c3=acc t=c3=abxt=fd.
|
||||
"""))
|
||||
self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt<78>.\n")
|
||||
|
||||
def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self):
|
||||
m = self._str_msg(textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf8"
|
||||
Content-Transfer-Encoding: quoted-printable
|
||||
|
||||
Bas=c3=acc t=c3=abxt=fd.
|
||||
"""))
|
||||
self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
|
||||
"Basìc tëxt.\n")
|
||||
|
||||
def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self):
|
||||
m = self._str_msg(textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf8"
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
QmFzw6xjIHTDq3h0Lgo\xFF=
|
||||
"""))
|
||||
self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
|
||||
"Basìc tëxt.\n")
|
||||
|
||||
def test_get_text_invalid_keyword(self):
|
||||
m = self._str_msg(textwrap.dedent("""\
|
||||
Content-Type: text/plain
|
||||
|
||||
Basic text.
|
||||
"""))
|
||||
with self.assertRaises(TypeError):
|
||||
raw_data_manager.get_content(m, foo='ignore')
|
||||
|
||||
def test_get_non_text(self):
|
||||
template = textwrap.dedent("""\
|
||||
Content-Type: {}
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
Ym9ndXMgZGF0YQ==
|
||||
""")
|
||||
for maintype in 'audio image video application'.split():
|
||||
with self.subTest(maintype=maintype):
|
||||
m = self._str_msg(template.format(maintype+'/foo'))
|
||||
self.assertEqual(raw_data_manager.get_content(m), b"bogus data")
|
||||
|
||||
def test_get_non_text_invalid_keyword(self):
|
||||
m = self._str_msg(textwrap.dedent("""\
|
||||
Content-Type: image/jpg
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
Ym9ndXMgZGF0YQ==
|
||||
"""))
|
||||
with self.assertRaises(TypeError):
|
||||
raw_data_manager.get_content(m, errors='ignore')
|
||||
|
||||
def test_get_raises_on_multipart(self):
|
||||
m = self._str_msg(textwrap.dedent("""\
|
||||
Content-Type: multipart/mixed; boundary="==="
|
||||
|
||||
--===
|
||||
--===--
|
||||
"""))
|
||||
with self.assertRaises(KeyError):
|
||||
raw_data_manager.get_content(m)
|
||||
|
||||
def test_get_message_rfc822_and_external_body(self):
|
||||
template = textwrap.dedent("""\
|
||||
Content-Type: message/{}
|
||||
|
||||
To: foo@example.com
|
||||
From: bar@example.com
|
||||
Subject: example
|
||||
|
||||
an example message
|
||||
""")
|
||||
for subtype in 'rfc822 external-body'.split():
|
||||
with self.subTest(subtype=subtype):
|
||||
m = self._str_msg(template.format(subtype))
|
||||
sub_msg = raw_data_manager.get_content(m)
|
||||
self.assertIsInstance(sub_msg, self.message)
|
||||
self.assertEqual(raw_data_manager.get_content(sub_msg),
|
||||
"an example message\n")
|
||||
self.assertEqual(sub_msg['to'], 'foo@example.com')
|
||||
self.assertEqual(sub_msg['from'].addresses[0].username, 'bar')
|
||||
|
||||
def test_get_message_non_rfc822_or_external_body_yields_bytes(self):
|
||||
m = self._str_msg(textwrap.dedent("""\
|
||||
Content-Type: message/partial
|
||||
|
||||
To: foo@example.com
|
||||
From: bar@example.com
|
||||
Subject: example
|
||||
|
||||
The real body is in another message.
|
||||
"""))
|
||||
self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex')
|
||||
|
||||
def test_set_text_plain(self):
|
||||
m = self._make_message()
|
||||
content = "Simple message.\n"
|
||||
raw_data_manager.set_content(m, content)
|
||||
self.assertEqual(str(m), textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: 7bit
|
||||
|
||||
Simple message.
|
||||
"""))
|
||||
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_text_plain_null(self):
|
||||
m = self._make_message()
|
||||
content = ''
|
||||
raw_data_manager.set_content(m, content)
|
||||
self.assertEqual(str(m), textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: 7bit
|
||||
|
||||
|
||||
"""))
|
||||
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), '\n')
|
||||
self.assertEqual(m.get_content(), '\n')
|
||||
|
||||
def test_set_text_html(self):
|
||||
m = self._make_message()
|
||||
content = "<p>Simple message.</p>\n"
|
||||
raw_data_manager.set_content(m, content, subtype='html')
|
||||
self.assertEqual(str(m), textwrap.dedent("""\
|
||||
Content-Type: text/html; charset="utf-8"
|
||||
Content-Transfer-Encoding: 7bit
|
||||
|
||||
<p>Simple message.</p>
|
||||
"""))
|
||||
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_text_charset_latin_1(self):
|
||||
m = self._make_message()
|
||||
content = "Simple message.\n"
|
||||
raw_data_manager.set_content(m, content, charset='latin-1')
|
||||
self.assertEqual(str(m), textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="iso-8859-1"
|
||||
Content-Transfer-Encoding: 7bit
|
||||
|
||||
Simple message.
|
||||
"""))
|
||||
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_text_plain_long_line_heuristics(self):
|
||||
m = self._make_message()
|
||||
content = ("Simple but long message that is over 78 characters"
|
||||
" long to force transfer encoding.\n")
|
||||
raw_data_manager.set_content(m, content)
|
||||
self.assertEqual(str(m), textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: quoted-printable
|
||||
|
||||
Simple but long message that is over 78 characters long to =
|
||||
force transfer encoding.
|
||||
"""))
|
||||
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_text_short_line_minimal_non_ascii_heuristics(self):
|
||||
m = self._make_message()
|
||||
content = "et là il est monté sur moi et il commence à m'éto.\n"
|
||||
raw_data_manager.set_content(m, content)
|
||||
self.assertEqual(bytes(m), textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: 8bit
|
||||
|
||||
et là il est monté sur moi et il commence à m'éto.
|
||||
""").encode('utf-8'))
|
||||
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_text_long_line_minimal_non_ascii_heuristics(self):
|
||||
m = self._make_message()
|
||||
content = ("j'ai un problème de python. il est sorti de son"
|
||||
" vivarium. et là il est monté sur moi et il commence"
|
||||
" à m'éto.\n")
|
||||
raw_data_manager.set_content(m, content)
|
||||
self.assertEqual(bytes(m), textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: quoted-printable
|
||||
|
||||
j'ai un probl=C3=A8me de python. il est sorti de son vivari=
|
||||
um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
|
||||
=C3=A0 m'=C3=A9to.
|
||||
""").encode('utf-8'))
|
||||
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self):
|
||||
m = self._make_message()
|
||||
content = '\n'*10 + (
|
||||
"j'ai un problème de python. il est sorti de son"
|
||||
" vivarium. et là il est monté sur moi et il commence"
|
||||
" à m'éto.\n")
|
||||
raw_data_manager.set_content(m, content)
|
||||
self.assertEqual(bytes(m), textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: quoted-printable
|
||||
""" + '\n'*10 + """
|
||||
j'ai un probl=C3=A8me de python. il est sorti de son vivari=
|
||||
um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
|
||||
=C3=A0 m'=C3=A9to.
|
||||
""").encode('utf-8'))
|
||||
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_text_maximal_non_ascii_heuristics(self):
|
||||
m = self._make_message()
|
||||
content = "áàäéèęöő.\n"
|
||||
raw_data_manager.set_content(m, content)
|
||||
self.assertEqual(bytes(m), textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: 8bit
|
||||
|
||||
áàäéèęöő.
|
||||
""").encode('utf-8'))
|
||||
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_text_11_lines_maximal_non_ascii_heuristics(self):
|
||||
m = self._make_message()
|
||||
content = '\n'*10 + "áàäéèęöő.\n"
|
||||
raw_data_manager.set_content(m, content)
|
||||
self.assertEqual(bytes(m), textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: 8bit
|
||||
""" + '\n'*10 + """
|
||||
áàäéèęöő.
|
||||
""").encode('utf-8'))
|
||||
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_text_long_line_maximal_non_ascii_heuristics(self):
|
||||
m = self._make_message()
|
||||
content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
|
||||
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
|
||||
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
|
||||
raw_data_manager.set_content(m, content)
|
||||
self.assertEqual(bytes(m), textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD
|
||||
tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo
|
||||
xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD
|
||||
qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg
|
||||
w6TDqcOoxJnDtsWRLgo=
|
||||
""").encode('utf-8'))
|
||||
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self):
|
||||
# Yes, it chooses "wrong" here. It's a heuristic. So this result
|
||||
# could change if we come up with a better heuristic.
|
||||
m = self._make_message()
|
||||
content = ('\n'*10 +
|
||||
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
|
||||
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
|
||||
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
|
||||
raw_data_manager.set_content(m, "\n"*10 +
|
||||
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
|
||||
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
|
||||
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
|
||||
self.assertEqual(bytes(m), textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: quoted-printable
|
||||
""" + '\n'*10 + """
|
||||
=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=
|
||||
=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=
|
||||
=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=
|
||||
=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=
|
||||
=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=
|
||||
=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=
|
||||
=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=
|
||||
=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=
|
||||
=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=
|
||||
=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=
|
||||
=C5=91.
|
||||
""").encode('utf-8'))
|
||||
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_text_non_ascii_with_cte_7bit_raises(self):
|
||||
m = self._make_message()
|
||||
with self.assertRaises(UnicodeError):
|
||||
raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit')
|
||||
|
||||
def test_set_text_non_ascii_with_charset_ascii_raises(self):
|
||||
m = self._make_message()
|
||||
with self.assertRaises(UnicodeError):
|
||||
raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii')
|
||||
|
||||
def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self):
|
||||
m = self._make_message()
|
||||
with self.assertRaises(UnicodeError):
|
||||
raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii')
|
||||
|
||||
def test_set_message(self):
|
||||
m = self._make_message()
|
||||
m['Subject'] = "Forwarded message"
|
||||
content = self._make_message()
|
||||
content['To'] = 'python@vivarium.org'
|
||||
content['From'] = 'police@monty.org'
|
||||
content['Subject'] = "get back in your box"
|
||||
content.set_content("Or face the comfy chair.")
|
||||
raw_data_manager.set_content(m, content)
|
||||
self.assertEqual(str(m), textwrap.dedent("""\
|
||||
Subject: Forwarded message
|
||||
Content-Type: message/rfc822
|
||||
Content-Transfer-Encoding: 8bit
|
||||
|
||||
To: python@vivarium.org
|
||||
From: police@monty.org
|
||||
Subject: get back in your box
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: 7bit
|
||||
MIME-Version: 1.0
|
||||
|
||||
Or face the comfy chair.
|
||||
"""))
|
||||
payload = m.get_payload(0)
|
||||
self.assertIsInstance(payload, self.message)
|
||||
self.assertEqual(str(payload), str(content))
|
||||
self.assertIsInstance(m.get_content(), self.message)
|
||||
self.assertEqual(str(m.get_content()), str(content))
|
||||
|
||||
def test_set_message_with_non_ascii_and_coercion_to_7bit(self):
|
||||
m = self._make_message()
|
||||
m['Subject'] = "Escape report"
|
||||
content = self._make_message()
|
||||
content['To'] = 'police@monty.org'
|
||||
content['From'] = 'victim@monty.org'
|
||||
content['Subject'] = "Help"
|
||||
content.set_content("j'ai un problème de python. il est sorti de son"
|
||||
" vivarium.")
|
||||
raw_data_manager.set_content(m, content)
|
||||
self.assertEqual(bytes(m), textwrap.dedent("""\
|
||||
Subject: Escape report
|
||||
Content-Type: message/rfc822
|
||||
Content-Transfer-Encoding: 8bit
|
||||
|
||||
To: police@monty.org
|
||||
From: victim@monty.org
|
||||
Subject: Help
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: 8bit
|
||||
MIME-Version: 1.0
|
||||
|
||||
j'ai un problème de python. il est sorti de son vivarium.
|
||||
""").encode('utf-8'))
|
||||
# The choice of base64 for the body encoding is because generator
|
||||
# doesn't bother with heuristics and uses it unconditionally for utf-8
|
||||
# text.
|
||||
# XXX: the first cte should be 7bit, too...that's a generator bug.
|
||||
# XXX: the line length in the body also looks like a generator bug.
|
||||
self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length),
|
||||
textwrap.dedent("""\
|
||||
Subject: Escape report
|
||||
Content-Type: message/rfc822
|
||||
Content-Transfer-Encoding: 8bit
|
||||
|
||||
To: police@monty.org
|
||||
From: victim@monty.org
|
||||
Subject: Help
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: base64
|
||||
MIME-Version: 1.0
|
||||
|
||||
aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt
|
||||
Lgo=
|
||||
"""))
|
||||
self.assertIsInstance(m.get_content(), self.message)
|
||||
self.assertEqual(str(m.get_content()), str(content))
|
||||
|
||||
def test_set_message_invalid_cte_raises(self):
|
||||
m = self._make_message()
|
||||
content = self._make_message()
|
||||
for cte in 'quoted-printable base64'.split():
|
||||
for subtype in 'rfc822 external-body'.split():
|
||||
with self.subTest(cte=cte, subtype=subtype):
|
||||
with self.assertRaises(ValueError) as ar:
|
||||
m.set_content(content, subtype, cte=cte)
|
||||
exc = str(ar.exception)
|
||||
self.assertIn(cte, exc)
|
||||
self.assertIn(subtype, exc)
|
||||
subtype = 'external-body'
|
||||
for cte in '8bit binary'.split():
|
||||
with self.subTest(cte=cte, subtype=subtype):
|
||||
with self.assertRaises(ValueError) as ar:
|
||||
m.set_content(content, subtype, cte=cte)
|
||||
exc = str(ar.exception)
|
||||
self.assertIn(cte, exc)
|
||||
self.assertIn(subtype, exc)
|
||||
|
||||
def test_set_image_jpg(self):
|
||||
for content in (b"bogus content",
|
||||
bytearray(b"bogus content"),
|
||||
memoryview(b"bogus content")):
|
||||
with self.subTest(content=content):
|
||||
m = self._make_message()
|
||||
raw_data_manager.set_content(m, content, 'image', 'jpeg')
|
||||
self.assertEqual(str(m), textwrap.dedent("""\
|
||||
Content-Type: image/jpeg
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
Ym9ndXMgY29udGVudA==
|
||||
"""))
|
||||
self.assertEqual(m.get_payload(decode=True), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_audio_aif_with_quoted_printable_cte(self):
|
||||
# Why you would use qp, I don't know, but it is technically supported.
|
||||
# XXX: the incorrect line length is because binascii.b2a_qp doesn't
|
||||
# support a line length parameter, but we must use it to get newline
|
||||
# encoding.
|
||||
# XXX: what about that lack of tailing newline? Do we actually handle
|
||||
# that correctly in all cases? That is, if the *source* has an
|
||||
# unencoded newline, do we add an extra newline to the returned payload
|
||||
# or not? And can that actually be disambiguated based on the RFC?
|
||||
m = self._make_message()
|
||||
content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
|
||||
m.set_content(content, 'audio', 'aif', cte='quoted-printable')
|
||||
self.assertEqual(bytes(m), textwrap.dedent("""\
|
||||
Content-Type: audio/aif
|
||||
Content-Transfer-Encoding: quoted-printable
|
||||
MIME-Version: 1.0
|
||||
|
||||
b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz=
|
||||
zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1'))
|
||||
self.assertEqual(m.get_payload(decode=True), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_video_mpeg_with_binary_cte(self):
|
||||
m = self._make_message()
|
||||
content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
|
||||
m.set_content(content, 'video', 'mpeg', cte='binary')
|
||||
self.assertEqual(bytes(m), textwrap.dedent("""\
|
||||
Content-Type: video/mpeg
|
||||
Content-Transfer-Encoding: binary
|
||||
MIME-Version: 1.0
|
||||
|
||||
""").encode('ascii') +
|
||||
# XXX: the second \n ought to be a \r, but generator gets it wrong.
|
||||
# THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE.
|
||||
b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' +
|
||||
b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz')
|
||||
self.assertEqual(m.get_payload(decode=True), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_application_octet_stream_with_8bit_cte(self):
|
||||
# In 8bit mode, universal line end logic applies. It is up to the
|
||||
# application to make sure the lines are short enough; we don't check.
|
||||
m = self._make_message()
|
||||
content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n'
|
||||
m.set_content(content, 'application', 'octet-stream', cte='8bit')
|
||||
self.assertEqual(bytes(m), textwrap.dedent("""\
|
||||
Content-Type: application/octet-stream
|
||||
Content-Transfer-Encoding: 8bit
|
||||
MIME-Version: 1.0
|
||||
|
||||
""").encode('ascii') +
|
||||
b'b\xFFgus\tcon\nt\nent\n' +
|
||||
b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n')
|
||||
self.assertEqual(m.get_payload(decode=True), content)
|
||||
self.assertEqual(m.get_content(), content)
|
||||
|
||||
def test_set_headers_from_header_objects(self):
|
||||
m = self._make_message()
|
||||
content = "Simple message.\n"
|
||||
header_factory = self.policy.header_factory
|
||||
raw_data_manager.set_content(m, content, headers=(
|
||||
header_factory("To", "foo@example.com"),
|
||||
header_factory("From", "foo@example.com"),
|
||||
header_factory("Subject", "I'm talking to myself.")))
|
||||
self.assertEqual(str(m), textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
To: foo@example.com
|
||||
From: foo@example.com
|
||||
Subject: I'm talking to myself.
|
||||
Content-Transfer-Encoding: 7bit
|
||||
|
||||
Simple message.
|
||||
"""))
|
||||
|
||||
def test_set_headers_from_strings(self):
|
||||
m = self._make_message()
|
||||
content = "Simple message.\n"
|
||||
raw_data_manager.set_content(m, content, headers=(
|
||||
"X-Foo-Header: foo",
|
||||
"X-Bar-Header: bar",))
|
||||
self.assertEqual(str(m), textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
X-Foo-Header: foo
|
||||
X-Bar-Header: bar
|
||||
Content-Transfer-Encoding: 7bit
|
||||
|
||||
Simple message.
|
||||
"""))
|
||||
|
||||
def test_set_headers_with_invalid_duplicate_string_header_raises(self):
|
||||
m = self._make_message()
|
||||
content = "Simple message.\n"
|
||||
with self.assertRaisesRegex(ValueError, 'Content-Type'):
|
||||
raw_data_manager.set_content(m, content, headers=(
|
||||
"Content-Type: foo/bar",)
|
||||
)
|
||||
|
||||
def test_set_headers_with_invalid_duplicate_header_header_raises(self):
|
||||
m = self._make_message()
|
||||
content = "Simple message.\n"
|
||||
header_factory = self.policy.header_factory
|
||||
with self.assertRaisesRegex(ValueError, 'Content-Type'):
|
||||
raw_data_manager.set_content(m, content, headers=(
|
||||
header_factory("Content-Type", " foo/bar"),)
|
||||
)
|
||||
|
||||
def test_set_headers_with_defective_string_header_raises(self):
|
||||
m = self._make_message()
|
||||
content = "Simple message.\n"
|
||||
with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
|
||||
raw_data_manager.set_content(m, content, headers=(
|
||||
'To: a@fairly@@invalid@address',)
|
||||
)
|
||||
print(m['To'].defects)
|
||||
|
||||
def test_set_headers_with_defective_header_header_raises(self):
|
||||
m = self._make_message()
|
||||
content = "Simple message.\n"
|
||||
header_factory = self.policy.header_factory
|
||||
with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
|
||||
raw_data_manager.set_content(m, content, headers=(
|
||||
header_factory('To', 'a@fairly@@invalid@address'),)
|
||||
)
|
||||
print(m['To'].defects)
|
||||
|
||||
def test_set_disposition_inline(self):
|
||||
m = self._make_message()
|
||||
m.set_content('foo', disposition='inline')
|
||||
self.assertEqual(m['Content-Disposition'], 'inline')
|
||||
|
||||
def test_set_disposition_attachment(self):
|
||||
m = self._make_message()
|
||||
m.set_content('foo', disposition='attachment')
|
||||
self.assertEqual(m['Content-Disposition'], 'attachment')
|
||||
|
||||
def test_set_disposition_foo(self):
|
||||
m = self._make_message()
|
||||
m.set_content('foo', disposition='foo')
|
||||
self.assertEqual(m['Content-Disposition'], 'foo')
|
||||
|
||||
# XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that
|
||||
# would cause 'foo' above to raise.
|
||||
|
||||
def test_set_filename(self):
|
||||
m = self._make_message()
|
||||
m.set_content('foo', filename='bar.txt')
|
||||
self.assertEqual(m['Content-Disposition'],
|
||||
'attachment; filename="bar.txt"')
|
||||
|
||||
def test_set_filename_and_disposition_inline(self):
|
||||
m = self._make_message()
|
||||
m.set_content('foo', disposition='inline', filename='bar.txt')
|
||||
self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"')
|
||||
|
||||
def test_set_non_ascii_filename(self):
|
||||
m = self._make_message()
|
||||
m.set_content('foo', filename='ábárî.txt')
|
||||
self.assertEqual(bytes(m), textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: 7bit
|
||||
Content-Disposition: attachment;
|
||||
filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt
|
||||
MIME-Version: 1.0
|
||||
|
||||
foo
|
||||
""").encode('ascii'))
|
||||
|
||||
def test_set_content_bytes_cte_7bit(self):
|
||||
m = self._make_message()
|
||||
m.set_content(b'ASCII-only message.\n',
|
||||
maintype='application', subtype='octet-stream', cte='7bit')
|
||||
self.assertEqual(str(m), textwrap.dedent("""\
|
||||
Content-Type: application/octet-stream
|
||||
Content-Transfer-Encoding: 7bit
|
||||
MIME-Version: 1.0
|
||||
|
||||
ASCII-only message.
|
||||
"""))
|
||||
|
||||
content_object_params = {
|
||||
'text_plain': ('content', ()),
|
||||
'text_html': ('content', ('html',)),
|
||||
'application_octet_stream': (b'content',
|
||||
('application', 'octet_stream')),
|
||||
'image_jpeg': (b'content', ('image', 'jpeg')),
|
||||
'message_rfc822': (message(), ()),
|
||||
'message_external_body': (message(), ('external-body',)),
|
||||
}
|
||||
|
||||
def content_object_as_header_receiver(self, obj, mimetype):
|
||||
m = self._make_message()
|
||||
m.set_content(obj, *mimetype, headers=(
|
||||
'To: foo@example.com',
|
||||
'From: bar@simple.net'))
|
||||
self.assertEqual(m['to'], 'foo@example.com')
|
||||
self.assertEqual(m['from'], 'bar@simple.net')
|
||||
|
||||
def content_object_as_disposition_inline_receiver(self, obj, mimetype):
|
||||
m = self._make_message()
|
||||
m.set_content(obj, *mimetype, disposition='inline')
|
||||
self.assertEqual(m['Content-Disposition'], 'inline')
|
||||
|
||||
def content_object_as_non_ascii_filename_receiver(self, obj, mimetype):
|
||||
m = self._make_message()
|
||||
m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt')
|
||||
self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"')
|
||||
self.assertEqual(m.get_filename(), "bár.txt")
|
||||
self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt")
|
||||
|
||||
def content_object_as_cid_receiver(self, obj, mimetype):
|
||||
m = self._make_message()
|
||||
m.set_content(obj, *mimetype, cid='some_random_stuff')
|
||||
self.assertEqual(m['Content-ID'], 'some_random_stuff')
|
||||
|
||||
def content_object_as_params_receiver(self, obj, mimetype):
|
||||
m = self._make_message()
|
||||
params = {'foo': 'bár', 'abc': 'xyz'}
|
||||
m.set_content(obj, *mimetype, params=params)
|
||||
if isinstance(obj, str):
|
||||
params['charset'] = 'utf-8'
|
||||
self.assertEqual(m['Content-Type'].params, params)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -0,0 +1,337 @@
|
||||
import textwrap
|
||||
import unittest
|
||||
import contextlib
|
||||
from email import policy
|
||||
from email import errors
|
||||
from test.test_email import TestEmailBase
|
||||
|
||||
|
||||
class TestDefectsBase:
|
||||
|
||||
policy = policy.default
|
||||
raise_expected = False
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _raise_point(self, defect):
|
||||
yield
|
||||
|
||||
def test_same_boundary_inner_outer(self):
|
||||
source = textwrap.dedent("""\
|
||||
Subject: XX
|
||||
From: xx@xx.dk
|
||||
To: XX
|
||||
Mime-version: 1.0
|
||||
Content-type: multipart/mixed;
|
||||
boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part
|
||||
Content-type: multipart/alternative;
|
||||
boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part
|
||||
Content-type: text/plain; charset="ISO-8859-1"
|
||||
Content-transfer-encoding: quoted-printable
|
||||
|
||||
text
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part
|
||||
Content-type: text/html; charset="ISO-8859-1"
|
||||
Content-transfer-encoding: quoted-printable
|
||||
|
||||
<HTML></HTML>
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part--
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part
|
||||
Content-type: image/gif; name="xx.gif";
|
||||
Content-disposition: attachment
|
||||
Content-transfer-encoding: base64
|
||||
|
||||
Some removed base64 encoded chars.
|
||||
|
||||
--MS_Mac_OE_3071477847_720252_MIME_Part--
|
||||
|
||||
""")
|
||||
# XXX better would be to actually detect the duplicate.
|
||||
with self._raise_point(errors.StartBoundaryNotFoundDefect):
|
||||
msg = self._str_msg(source)
|
||||
if self.raise_expected: return
|
||||
inner = msg.get_payload(0)
|
||||
self.assertTrue(hasattr(inner, 'defects'))
|
||||
self.assertEqual(len(self.get_defects(inner)), 1)
|
||||
self.assertIsInstance(self.get_defects(inner)[0],
|
||||
errors.StartBoundaryNotFoundDefect)
|
||||
|
||||
def test_multipart_no_boundary(self):
|
||||
source = textwrap.dedent("""\
|
||||
Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
|
||||
From: foobar
|
||||
Subject: broken mail
|
||||
MIME-Version: 1.0
|
||||
Content-Type: multipart/report; report-type=delivery-status;
|
||||
|
||||
--JAB03225.986577786/zinfandel.lacita.com
|
||||
|
||||
One part
|
||||
|
||||
--JAB03225.986577786/zinfandel.lacita.com
|
||||
Content-Type: message/delivery-status
|
||||
|
||||
Header: Another part
|
||||
|
||||
--JAB03225.986577786/zinfandel.lacita.com--
|
||||
""")
|
||||
with self._raise_point(errors.NoBoundaryInMultipartDefect):
|
||||
msg = self._str_msg(source)
|
||||
if self.raise_expected: return
|
||||
self.assertIsInstance(msg.get_payload(), str)
|
||||
self.assertEqual(len(self.get_defects(msg)), 2)
|
||||
self.assertIsInstance(self.get_defects(msg)[0],
|
||||
errors.NoBoundaryInMultipartDefect)
|
||||
self.assertIsInstance(self.get_defects(msg)[1],
|
||||
errors.MultipartInvariantViolationDefect)
|
||||
|
||||
multipart_msg = textwrap.dedent("""\
|
||||
Date: Wed, 14 Nov 2007 12:56:23 GMT
|
||||
From: foo@bar.invalid
|
||||
To: foo@bar.invalid
|
||||
Subject: Content-Transfer-Encoding: base64 and multipart
|
||||
MIME-Version: 1.0
|
||||
Content-Type: multipart/mixed;
|
||||
boundary="===============3344438784458119861=="{}
|
||||
|
||||
--===============3344438784458119861==
|
||||
Content-Type: text/plain
|
||||
|
||||
Test message
|
||||
|
||||
--===============3344438784458119861==
|
||||
Content-Type: application/octet-stream
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
YWJj
|
||||
|
||||
--===============3344438784458119861==--
|
||||
""")
|
||||
|
||||
def test_multipart_invalid_cte(self):
|
||||
with self._raise_point(
|
||||
errors.InvalidMultipartContentTransferEncodingDefect):
|
||||
msg = self._str_msg(
|
||||
self.multipart_msg.format(
|
||||
"\nContent-Transfer-Encoding: base64"))
|
||||
if self.raise_expected: return
|
||||
self.assertEqual(len(self.get_defects(msg)), 1)
|
||||
self.assertIsInstance(self.get_defects(msg)[0],
|
||||
errors.InvalidMultipartContentTransferEncodingDefect)
|
||||
|
||||
def test_multipart_no_cte_no_defect(self):
|
||||
if self.raise_expected: return
|
||||
msg = self._str_msg(self.multipart_msg.format(''))
|
||||
self.assertEqual(len(self.get_defects(msg)), 0)
|
||||
|
||||
def test_multipart_valid_cte_no_defect(self):
|
||||
if self.raise_expected: return
|
||||
for cte in ('7bit', '8bit', 'BINary'):
|
||||
msg = self._str_msg(
|
||||
self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
|
||||
self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
|
||||
|
||||
def test_lying_multipart(self):
|
||||
source = textwrap.dedent("""\
|
||||
From: "Allison Dunlap" <xxx@example.com>
|
||||
To: yyy@example.com
|
||||
Subject: 64423
|
||||
Date: Sun, 11 Jul 2004 16:09:27 -0300
|
||||
MIME-Version: 1.0
|
||||
Content-Type: multipart/alternative;
|
||||
|
||||
Blah blah blah
|
||||
""")
|
||||
with self._raise_point(errors.NoBoundaryInMultipartDefect):
|
||||
msg = self._str_msg(source)
|
||||
if self.raise_expected: return
|
||||
self.assertTrue(hasattr(msg, 'defects'))
|
||||
self.assertEqual(len(self.get_defects(msg)), 2)
|
||||
self.assertIsInstance(self.get_defects(msg)[0],
|
||||
errors.NoBoundaryInMultipartDefect)
|
||||
self.assertIsInstance(self.get_defects(msg)[1],
|
||||
errors.MultipartInvariantViolationDefect)
|
||||
|
||||
def test_missing_start_boundary(self):
|
||||
source = textwrap.dedent("""\
|
||||
Content-Type: multipart/mixed; boundary="AAA"
|
||||
From: Mail Delivery Subsystem <xxx@example.com>
|
||||
To: yyy@example.com
|
||||
|
||||
--AAA
|
||||
|
||||
Stuff
|
||||
|
||||
--AAA
|
||||
Content-Type: message/rfc822
|
||||
|
||||
From: webmaster@python.org
|
||||
To: zzz@example.com
|
||||
Content-Type: multipart/mixed; boundary="BBB"
|
||||
|
||||
--BBB--
|
||||
|
||||
--AAA--
|
||||
|
||||
""")
|
||||
# The message structure is:
|
||||
#
|
||||
# multipart/mixed
|
||||
# text/plain
|
||||
# message/rfc822
|
||||
# multipart/mixed [*]
|
||||
#
|
||||
# [*] This message is missing its start boundary
|
||||
with self._raise_point(errors.StartBoundaryNotFoundDefect):
|
||||
outer = self._str_msg(source)
|
||||
if self.raise_expected: return
|
||||
bad = outer.get_payload(1).get_payload(0)
|
||||
self.assertEqual(len(self.get_defects(bad)), 1)
|
||||
self.assertIsInstance(self.get_defects(bad)[0],
|
||||
errors.StartBoundaryNotFoundDefect)
|
||||
|
||||
def test_first_line_is_continuation_header(self):
|
||||
with self._raise_point(errors.FirstHeaderLineIsContinuationDefect):
|
||||
msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
|
||||
if self.raise_expected: return
|
||||
self.assertEqual(msg.keys(), ['Subject'])
|
||||
self.assertEqual(msg.get_payload(), 'body')
|
||||
self.assertEqual(len(self.get_defects(msg)), 1)
|
||||
self.assertDefectsEqual(self.get_defects(msg),
|
||||
[errors.FirstHeaderLineIsContinuationDefect])
|
||||
self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
|
||||
|
||||
def test_missing_header_body_separator(self):
|
||||
# Our heuristic if we see a line that doesn't look like a header (no
|
||||
# leading whitespace but no ':') is to assume that the blank line that
|
||||
# separates the header from the body is missing, and to stop parsing
|
||||
# headers and start parsing the body.
|
||||
with self._raise_point(errors.MissingHeaderBodySeparatorDefect):
|
||||
msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
|
||||
if self.raise_expected: return
|
||||
self.assertEqual(msg.keys(), ['Subject'])
|
||||
self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
|
||||
self.assertDefectsEqual(self.get_defects(msg),
|
||||
[errors.MissingHeaderBodySeparatorDefect])
|
||||
|
||||
def test_bad_padding_in_base64_payload(self):
|
||||
source = textwrap.dedent("""\
|
||||
Subject: test
|
||||
MIME-Version: 1.0
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
dmk
|
||||
""")
|
||||
msg = self._str_msg(source)
|
||||
with self._raise_point(errors.InvalidBase64PaddingDefect):
|
||||
payload = msg.get_payload(decode=True)
|
||||
if self.raise_expected: return
|
||||
self.assertEqual(payload, b'vi')
|
||||
self.assertDefectsEqual(self.get_defects(msg),
|
||||
[errors.InvalidBase64PaddingDefect])
|
||||
|
||||
def test_invalid_chars_in_base64_payload(self):
|
||||
source = textwrap.dedent("""\
|
||||
Subject: test
|
||||
MIME-Version: 1.0
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
dm\x01k===
|
||||
""")
|
||||
msg = self._str_msg(source)
|
||||
with self._raise_point(errors.InvalidBase64CharactersDefect):
|
||||
payload = msg.get_payload(decode=True)
|
||||
if self.raise_expected: return
|
||||
self.assertEqual(payload, b'vi')
|
||||
self.assertDefectsEqual(self.get_defects(msg),
|
||||
[errors.InvalidBase64CharactersDefect])
|
||||
|
||||
def test_invalid_length_of_base64_payload(self):
|
||||
source = textwrap.dedent("""\
|
||||
Subject: test
|
||||
MIME-Version: 1.0
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
abcde
|
||||
""")
|
||||
msg = self._str_msg(source)
|
||||
with self._raise_point(errors.InvalidBase64LengthDefect):
|
||||
payload = msg.get_payload(decode=True)
|
||||
if self.raise_expected: return
|
||||
self.assertEqual(payload, b'abcde')
|
||||
self.assertDefectsEqual(self.get_defects(msg),
|
||||
[errors.InvalidBase64LengthDefect])
|
||||
|
||||
def test_missing_ending_boundary(self):
|
||||
source = textwrap.dedent("""\
|
||||
To: 1@harrydomain4.com
|
||||
Subject: Fwd: 1
|
||||
MIME-Version: 1.0
|
||||
Content-Type: multipart/alternative;
|
||||
boundary="------------000101020201080900040301"
|
||||
|
||||
--------------000101020201080900040301
|
||||
Content-Type: text/plain; charset=ISO-8859-1
|
||||
Content-Transfer-Encoding: 7bit
|
||||
|
||||
Alternative 1
|
||||
|
||||
--------------000101020201080900040301
|
||||
Content-Type: text/html; charset=ISO-8859-1
|
||||
Content-Transfer-Encoding: 7bit
|
||||
|
||||
Alternative 2
|
||||
|
||||
""")
|
||||
with self._raise_point(errors.CloseBoundaryNotFoundDefect):
|
||||
msg = self._str_msg(source)
|
||||
if self.raise_expected: return
|
||||
self.assertEqual(len(msg.get_payload()), 2)
|
||||
self.assertEqual(msg.get_payload(1).get_payload(), 'Alternative 2\n')
|
||||
self.assertDefectsEqual(self.get_defects(msg),
|
||||
[errors.CloseBoundaryNotFoundDefect])
|
||||
|
||||
|
||||
class TestDefectDetection(TestDefectsBase, TestEmailBase):
|
||||
|
||||
def get_defects(self, obj):
|
||||
return obj.defects
|
||||
|
||||
|
||||
class TestDefectCapture(TestDefectsBase, TestEmailBase):
|
||||
|
||||
class CapturePolicy(policy.EmailPolicy):
|
||||
captured = None
|
||||
def register_defect(self, obj, defect):
|
||||
self.captured.append(defect)
|
||||
|
||||
def setUp(self):
|
||||
self.policy = self.CapturePolicy(captured=list())
|
||||
|
||||
def get_defects(self, obj):
|
||||
return self.policy.captured
|
||||
|
||||
|
||||
class TestDefectRaising(TestDefectsBase, TestEmailBase):
|
||||
|
||||
policy = TestDefectsBase.policy
|
||||
policy = policy.clone(raise_on_defect=True)
|
||||
raise_expected = True
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _raise_point(self, defect):
|
||||
with self.assertRaises(defect):
|
||||
yield
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
5805
bin/python/python-3.13/Lib/test/test_email/test_email.py
Normal file
5805
bin/python/python-3.13/Lib/test/test_email/test_email.py
Normal file
File diff suppressed because it is too large
Load Diff
477
bin/python/python-3.13/Lib/test/test_email/test_generator.py
Normal file
477
bin/python/python-3.13/Lib/test/test_email/test_generator.py
Normal file
@@ -0,0 +1,477 @@
|
||||
import io
|
||||
import textwrap
|
||||
import unittest
|
||||
from email import message_from_string, message_from_bytes
|
||||
from email.message import EmailMessage
|
||||
from email.generator import Generator, BytesGenerator
|
||||
from email.headerregistry import Address
|
||||
from email import policy
|
||||
import email.errors
|
||||
from test.test_email import TestEmailBase, parameterize
|
||||
|
||||
|
||||
@parameterize
|
||||
class TestGeneratorBase:
|
||||
|
||||
policy = policy.default
|
||||
|
||||
def msgmaker(self, msg, policy=None):
|
||||
policy = self.policy if policy is None else policy
|
||||
return self.msgfunc(msg, policy=policy)
|
||||
|
||||
refold_long_expected = {
|
||||
0: textwrap.dedent("""\
|
||||
To: whom_it_may_concern@example.com
|
||||
From: nobody_you_want_to_know@example.com
|
||||
Subject: We the willing led by the unknowing are doing the
|
||||
impossible for the ungrateful. We have done so much for so long with so little
|
||||
we are now qualified to do anything with nothing.
|
||||
|
||||
None
|
||||
"""),
|
||||
40: textwrap.dedent("""\
|
||||
To: whom_it_may_concern@example.com
|
||||
From:
|
||||
nobody_you_want_to_know@example.com
|
||||
Subject: We the willing led by the
|
||||
unknowing are doing the impossible for
|
||||
the ungrateful. We have done so much
|
||||
for so long with so little we are now
|
||||
qualified to do anything with nothing.
|
||||
|
||||
None
|
||||
"""),
|
||||
20: textwrap.dedent("""\
|
||||
To:
|
||||
whom_it_may_concern@example.com
|
||||
From:
|
||||
nobody_you_want_to_know@example.com
|
||||
Subject: We the
|
||||
willing led by the
|
||||
unknowing are doing
|
||||
the impossible for
|
||||
the ungrateful. We
|
||||
have done so much
|
||||
for so long with so
|
||||
little we are now
|
||||
qualified to do
|
||||
anything with
|
||||
nothing.
|
||||
|
||||
None
|
||||
"""),
|
||||
}
|
||||
refold_long_expected[100] = refold_long_expected[0]
|
||||
|
||||
refold_all_expected = refold_long_expected.copy()
|
||||
refold_all_expected[0] = (
|
||||
"To: whom_it_may_concern@example.com\n"
|
||||
"From: nobody_you_want_to_know@example.com\n"
|
||||
"Subject: We the willing led by the unknowing are doing the "
|
||||
"impossible for the ungrateful. We have done so much for "
|
||||
"so long with so little we are now qualified to do anything "
|
||||
"with nothing.\n"
|
||||
"\n"
|
||||
"None\n")
|
||||
refold_all_expected[100] = (
|
||||
"To: whom_it_may_concern@example.com\n"
|
||||
"From: nobody_you_want_to_know@example.com\n"
|
||||
"Subject: We the willing led by the unknowing are doing the "
|
||||
"impossible for the ungrateful. We have\n"
|
||||
" done so much for so long with so little we are now qualified "
|
||||
"to do anything with nothing.\n"
|
||||
"\n"
|
||||
"None\n")
|
||||
|
||||
length_params = [n for n in refold_long_expected]
|
||||
|
||||
def length_as_maxheaderlen_parameter(self, n):
|
||||
msg = self.msgmaker(self.typ(self.refold_long_expected[0]))
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, maxheaderlen=n, policy=self.policy)
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(self.refold_long_expected[n]))
|
||||
|
||||
def length_as_max_line_length_policy(self, n):
|
||||
msg = self.msgmaker(self.typ(self.refold_long_expected[0]))
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, policy=self.policy.clone(max_line_length=n))
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(self.refold_long_expected[n]))
|
||||
|
||||
def length_as_maxheaderlen_parm_overrides_policy(self, n):
|
||||
msg = self.msgmaker(self.typ(self.refold_long_expected[0]))
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, maxheaderlen=n,
|
||||
policy=self.policy.clone(max_line_length=10))
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(self.refold_long_expected[n]))
|
||||
|
||||
def length_as_max_line_length_with_refold_none_does_not_fold(self, n):
|
||||
msg = self.msgmaker(self.typ(self.refold_long_expected[0]))
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, policy=self.policy.clone(refold_source='none',
|
||||
max_line_length=n))
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(self.refold_long_expected[0]))
|
||||
|
||||
def length_as_max_line_length_with_refold_all_folds(self, n):
|
||||
msg = self.msgmaker(self.typ(self.refold_long_expected[0]))
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, policy=self.policy.clone(refold_source='all',
|
||||
max_line_length=n))
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(self.refold_all_expected[n]))
|
||||
|
||||
def test_crlf_control_via_policy(self):
|
||||
source = "Subject: test\r\n\r\ntest body\r\n"
|
||||
expected = source
|
||||
msg = self.msgmaker(self.typ(source))
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, policy=policy.SMTP)
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(expected))
|
||||
|
||||
def test_flatten_linesep_overrides_policy(self):
|
||||
source = "Subject: test\n\ntest body\n"
|
||||
expected = source
|
||||
msg = self.msgmaker(self.typ(source))
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, policy=policy.SMTP)
|
||||
g.flatten(msg, linesep='\n')
|
||||
self.assertEqual(s.getvalue(), self.typ(expected))
|
||||
|
||||
def test_flatten_linesep(self):
|
||||
source = 'Subject: one\n two\r three\r\n four\r\n\r\ntest body\r\n'
|
||||
msg = self.msgmaker(self.typ(source))
|
||||
self.assertEqual(msg['Subject'], 'one two three four')
|
||||
|
||||
expected = 'Subject: one\n two\n three\n four\n\ntest body\n'
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s)
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(expected))
|
||||
|
||||
expected = 'Subject: one two three four\n\ntest body\n'
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, policy=self.policy.clone(refold_source='all'))
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(expected))
|
||||
|
||||
def test_flatten_control_linesep(self):
|
||||
source = 'Subject: one\v two\f three\x1c four\x1d five\x1e six\r\n\r\ntest body\r\n'
|
||||
msg = self.msgmaker(self.typ(source))
|
||||
self.assertEqual(msg['Subject'], 'one\v two\f three\x1c four\x1d five\x1e six')
|
||||
|
||||
expected = 'Subject: one\v two\f three\x1c four\x1d five\x1e six\n\ntest body\n'
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s)
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(expected))
|
||||
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, policy=self.policy.clone(refold_source='all'))
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(expected))
|
||||
|
||||
def test_set_mangle_from_via_policy(self):
|
||||
source = textwrap.dedent("""\
|
||||
Subject: test that
|
||||
from is mangled in the body!
|
||||
|
||||
From time to time I write a rhyme.
|
||||
""")
|
||||
variants = (
|
||||
(None, True),
|
||||
(policy.compat32, True),
|
||||
(policy.default, False),
|
||||
(policy.default.clone(mangle_from_=True), True),
|
||||
)
|
||||
for p, mangle in variants:
|
||||
expected = source.replace('From ', '>From ') if mangle else source
|
||||
with self.subTest(policy=p, mangle_from_=mangle):
|
||||
msg = self.msgmaker(self.typ(source))
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, policy=p)
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(expected))
|
||||
|
||||
def test_compat32_max_line_length_does_not_fold_when_none(self):
|
||||
msg = self.msgmaker(self.typ(self.refold_long_expected[0]))
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, policy=policy.compat32.clone(max_line_length=None))
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(self.refold_long_expected[0]))
|
||||
|
||||
def test_rfc2231_wrapping(self):
|
||||
# This is pretty much just to make sure we don't have an infinite
|
||||
# loop; I don't expect anyone to hit this in the field.
|
||||
msg = self.msgmaker(self.typ(textwrap.dedent("""\
|
||||
To: nobody
|
||||
Content-Disposition: attachment;
|
||||
filename="afilenamelongenoghtowraphere"
|
||||
|
||||
None
|
||||
""")))
|
||||
expected = textwrap.dedent("""\
|
||||
To: nobody
|
||||
Content-Disposition: attachment;
|
||||
filename*0*=us-ascii''afilename;
|
||||
filename*1*=longenoghtowraphere
|
||||
|
||||
None
|
||||
""")
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, policy=self.policy.clone(max_line_length=33))
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(expected))
|
||||
|
||||
def test_rfc2231_wrapping_switches_to_default_len_if_too_narrow(self):
|
||||
# This is just to make sure we don't have an infinite loop; I don't
|
||||
# expect anyone to hit this in the field, so I'm not bothering to make
|
||||
# the result optimal (the encoding isn't needed).
|
||||
msg = self.msgmaker(self.typ(textwrap.dedent("""\
|
||||
To: nobody
|
||||
Content-Disposition: attachment;
|
||||
filename="afilenamelongenoghtowraphere"
|
||||
|
||||
None
|
||||
""")))
|
||||
expected = textwrap.dedent("""\
|
||||
To: nobody
|
||||
Content-Disposition:
|
||||
attachment;
|
||||
filename*0*=us-ascii''afilenamelongenoghtowraphere
|
||||
|
||||
None
|
||||
""")
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, policy=self.policy.clone(max_line_length=20))
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(expected))
|
||||
|
||||
def test_keep_encoded_newlines(self):
|
||||
msg = self.msgmaker(self.typ(textwrap.dedent("""\
|
||||
To: nobody
|
||||
Subject: Bad subject=?UTF-8?Q?=0A?=Bcc: injection@example.com
|
||||
|
||||
None
|
||||
""")))
|
||||
expected = textwrap.dedent("""\
|
||||
To: nobody
|
||||
Subject: Bad subject=?UTF-8?Q?=0A?=Bcc: injection@example.com
|
||||
|
||||
None
|
||||
""")
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, policy=self.policy.clone(max_line_length=80))
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(expected))
|
||||
|
||||
def test_keep_long_encoded_newlines(self):
|
||||
msg = self.msgmaker(self.typ(textwrap.dedent("""\
|
||||
To: nobody
|
||||
Subject: Bad subject=?UTF-8?Q?=0A?=Bcc: injection@example.com
|
||||
|
||||
None
|
||||
""")))
|
||||
expected = textwrap.dedent("""\
|
||||
To: nobody
|
||||
Subject: Bad subject
|
||||
=?utf-8?q?=0A?=Bcc:
|
||||
injection@example.com
|
||||
|
||||
None
|
||||
""")
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, policy=self.policy.clone(max_line_length=30))
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(expected))
|
||||
|
||||
|
||||
class TestGenerator(TestGeneratorBase, TestEmailBase):
|
||||
|
||||
msgfunc = staticmethod(message_from_string)
|
||||
genclass = Generator
|
||||
ioclass = io.StringIO
|
||||
typ = str
|
||||
|
||||
def test_flatten_unicode_linesep(self):
|
||||
source = 'Subject: one\x85 two\u2028 three\u2029 four\r\n\r\ntest body\r\n'
|
||||
msg = self.msgmaker(self.typ(source))
|
||||
self.assertEqual(msg['Subject'], 'one\x85 two\u2028 three\u2029 four')
|
||||
|
||||
expected = 'Subject: =?utf-8?b?b25lwoUgdHdv4oCoIHRocmVl4oCp?= four\n\ntest body\n'
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s)
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(expected))
|
||||
|
||||
s = self.ioclass()
|
||||
g = self.genclass(s, policy=self.policy.clone(refold_source='all'))
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), self.typ(expected))
|
||||
|
||||
def test_verify_generated_headers(self):
|
||||
"""gh-121650: by default the generator prevents header injection"""
|
||||
class LiteralHeader(str):
|
||||
name = 'Header'
|
||||
def fold(self, **kwargs):
|
||||
return self
|
||||
|
||||
for text in (
|
||||
'Value\r\nBad Injection\r\n',
|
||||
'NoNewLine'
|
||||
):
|
||||
with self.subTest(text=text):
|
||||
message = message_from_string(
|
||||
"Header: Value\r\n\r\nBody",
|
||||
policy=self.policy,
|
||||
)
|
||||
|
||||
del message['Header']
|
||||
message['Header'] = LiteralHeader(text)
|
||||
|
||||
with self.assertRaises(email.errors.HeaderWriteError):
|
||||
message.as_string()
|
||||
|
||||
|
||||
class TestBytesGenerator(TestGeneratorBase, TestEmailBase):
|
||||
|
||||
msgfunc = staticmethod(message_from_bytes)
|
||||
genclass = BytesGenerator
|
||||
ioclass = io.BytesIO
|
||||
typ = lambda self, x: x.encode('ascii')
|
||||
|
||||
def test_defaults_handle_spaces_between_encoded_words_when_folded(self):
|
||||
source = ("Уведомление о принятии в работу обращения для"
|
||||
" подключения услуги")
|
||||
expected = ('Subject: =?utf-8?b?0KPQstC10LTQvtC80LvQtdC90LjQtSDQviDQv9GA0LjQvdGP0YLQuNC4?=\n'
|
||||
' =?utf-8?b?INCyINGA0LDQsdC+0YLRgyDQvtCx0YDQsNGJ0LXQvdC40Y8g0LTQu9GPINC/0L4=?=\n'
|
||||
' =?utf-8?b?0LTQutC70Y7Rh9C10L3QuNGPINGD0YHQu9GD0LPQuA==?=\n\n').encode('ascii')
|
||||
msg = EmailMessage()
|
||||
msg['Subject'] = source
|
||||
s = io.BytesIO()
|
||||
g = BytesGenerator(s)
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), expected)
|
||||
|
||||
def test_defaults_handle_spaces_when_encoded_words_is_folded_in_middle(self):
|
||||
source = ('A very long long long long long long long long long long long long '
|
||||
'long long long long long long long long long long long súmmäry')
|
||||
expected = ('Subject: A very long long long long long long long long long long long long\n'
|
||||
' long long long long long long long long long long long =?utf-8?q?s=C3=BAmm?=\n'
|
||||
' =?utf-8?q?=C3=A4ry?=\n\n').encode('ascii')
|
||||
msg = EmailMessage()
|
||||
msg['Subject'] = source
|
||||
s = io.BytesIO()
|
||||
g = BytesGenerator(s)
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), expected)
|
||||
|
||||
def test_defaults_handle_spaces_at_start_of_subject(self):
|
||||
source = " Уведомление"
|
||||
expected = b"Subject: =?utf-8?b?0KPQstC10LTQvtC80LvQtdC90LjQtQ==?=\n\n"
|
||||
msg = EmailMessage()
|
||||
msg['Subject'] = source
|
||||
s = io.BytesIO()
|
||||
g = BytesGenerator(s)
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), expected)
|
||||
|
||||
def test_defaults_handle_spaces_at_start_of_continuation_line(self):
|
||||
source = " ф ффффффффффффффффффф ф ф"
|
||||
expected = (b"Subject: "
|
||||
b"=?utf-8?b?0YQg0YTRhNGE0YTRhNGE0YTRhNGE0YTRhNGE0YTRhNGE0YTRhNGE0YQ=?=\n"
|
||||
b" =?utf-8?b?INGEINGE?=\n\n")
|
||||
msg = EmailMessage()
|
||||
msg['Subject'] = source
|
||||
s = io.BytesIO()
|
||||
g = BytesGenerator(s)
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), expected)
|
||||
|
||||
def test_cte_type_7bit_handles_unknown_8bit(self):
|
||||
source = ("Subject: Maintenant je vous présente mon "
|
||||
"collègue\n\n").encode('utf-8')
|
||||
expected = ('Subject: Maintenant je vous =?unknown-8bit?q?'
|
||||
'pr=C3=A9sente_mon_coll=C3=A8gue?=\n\n').encode('ascii')
|
||||
msg = message_from_bytes(source)
|
||||
s = io.BytesIO()
|
||||
g = BytesGenerator(s, policy=self.policy.clone(cte_type='7bit'))
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), expected)
|
||||
|
||||
def test_cte_type_7bit_transforms_8bit_cte(self):
|
||||
source = textwrap.dedent("""\
|
||||
From: foo@bar.com
|
||||
To: Dinsdale
|
||||
Subject: Nudge nudge, wink, wink
|
||||
Mime-Version: 1.0
|
||||
Content-Type: text/plain; charset="latin-1"
|
||||
Content-Transfer-Encoding: 8bit
|
||||
|
||||
oh là là, know what I mean, know what I mean?
|
||||
""").encode('latin1')
|
||||
msg = message_from_bytes(source)
|
||||
expected = textwrap.dedent("""\
|
||||
From: foo@bar.com
|
||||
To: Dinsdale
|
||||
Subject: Nudge nudge, wink, wink
|
||||
Mime-Version: 1.0
|
||||
Content-Type: text/plain; charset="iso-8859-1"
|
||||
Content-Transfer-Encoding: quoted-printable
|
||||
|
||||
oh l=E0 l=E0, know what I mean, know what I mean?
|
||||
""").encode('ascii')
|
||||
s = io.BytesIO()
|
||||
g = BytesGenerator(s, policy=self.policy.clone(cte_type='7bit',
|
||||
linesep='\n'))
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), expected)
|
||||
|
||||
def test_smtputf8_policy(self):
|
||||
msg = EmailMessage()
|
||||
msg['From'] = "Páolo <főo@bar.com>"
|
||||
msg['To'] = 'Dinsdale'
|
||||
msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
|
||||
msg.set_content("oh là là, know what I mean, know what I mean?")
|
||||
expected = textwrap.dedent("""\
|
||||
From: Páolo <főo@bar.com>
|
||||
To: Dinsdale
|
||||
Subject: Nudge nudge, wink, wink \u1F609
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: 8bit
|
||||
MIME-Version: 1.0
|
||||
|
||||
oh là là, know what I mean, know what I mean?
|
||||
""").encode('utf-8').replace(b'\n', b'\r\n')
|
||||
s = io.BytesIO()
|
||||
g = BytesGenerator(s, policy=policy.SMTPUTF8)
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), expected)
|
||||
|
||||
def test_smtp_policy(self):
|
||||
msg = EmailMessage()
|
||||
msg["From"] = Address(addr_spec="foo@bar.com", display_name="Páolo")
|
||||
msg["To"] = Address(addr_spec="bar@foo.com", display_name="Dinsdale")
|
||||
msg["Subject"] = "Nudge nudge, wink, wink"
|
||||
msg.set_content("oh boy, know what I mean, know what I mean?")
|
||||
expected = textwrap.dedent("""\
|
||||
From: =?utf-8?q?P=C3=A1olo?= <foo@bar.com>
|
||||
To: Dinsdale <bar@foo.com>
|
||||
Subject: Nudge nudge, wink, wink
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: 7bit
|
||||
MIME-Version: 1.0
|
||||
|
||||
oh boy, know what I mean, know what I mean?
|
||||
""").encode().replace(b"\n", b"\r\n")
|
||||
s = io.BytesIO()
|
||||
g = BytesGenerator(s, policy=policy.SMTP)
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), expected)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
1812
bin/python/python-3.13/Lib/test/test_email/test_headerregistry.py
Normal file
1812
bin/python/python-3.13/Lib/test/test_email/test_headerregistry.py
Normal file
File diff suppressed because it is too large
Load Diff
78
bin/python/python-3.13/Lib/test/test_email/test_inversion.py
Normal file
78
bin/python/python-3.13/Lib/test/test_email/test_inversion.py
Normal file
@@ -0,0 +1,78 @@
|
||||
"""Test the parser and generator are inverses.
|
||||
|
||||
Note that this is only strictly true if we are parsing RFC valid messages and
|
||||
producing RFC valid messages.
|
||||
"""
|
||||
|
||||
import io
|
||||
import unittest
|
||||
from email import policy, message_from_bytes
|
||||
from email.message import EmailMessage
|
||||
from email.generator import BytesGenerator
|
||||
from test.test_email import TestEmailBase, parameterize
|
||||
|
||||
# This is like textwrap.dedent for bytes, except that it uses \r\n for the line
|
||||
# separators on the rebuilt string.
|
||||
def dedent(bstr):
|
||||
lines = bstr.splitlines()
|
||||
if not lines[0].strip():
|
||||
raise ValueError("First line must contain text")
|
||||
stripamt = len(lines[0]) - len(lines[0].lstrip())
|
||||
return b'\r\n'.join(
|
||||
[x[stripamt:] if len(x)>=stripamt else b''
|
||||
for x in lines])
|
||||
|
||||
|
||||
@parameterize
|
||||
class TestInversion(TestEmailBase):
|
||||
|
||||
policy = policy.default
|
||||
message = EmailMessage
|
||||
|
||||
def msg_as_input(self, msg):
|
||||
m = message_from_bytes(msg, policy=policy.SMTP)
|
||||
b = io.BytesIO()
|
||||
g = BytesGenerator(b)
|
||||
g.flatten(m)
|
||||
self.assertEqual(b.getvalue(), msg)
|
||||
|
||||
# XXX: spaces are not preserved correctly here yet in the general case.
|
||||
msg_params = {
|
||||
'header_with_one_space_body': (dedent(b"""\
|
||||
From: abc@xyz.com
|
||||
X-Status:\x20
|
||||
Subject: test
|
||||
|
||||
foo
|
||||
"""),),
|
||||
|
||||
'header_with_invalid_date': (dedent(b"""\
|
||||
Date: Tue, 06 Jun 2017 27:39:33 +0600
|
||||
From: abc@xyz.com
|
||||
Subject: timezones
|
||||
|
||||
How do they work even?
|
||||
"""),),
|
||||
|
||||
}
|
||||
|
||||
payload_params = {
|
||||
'plain_text': dict(payload='This is a test\n'*20),
|
||||
'base64_text': dict(payload=(('xy a'*40+'\n')*5), cte='base64'),
|
||||
'qp_text': dict(payload=(('xy a'*40+'\n')*5), cte='quoted-printable'),
|
||||
}
|
||||
|
||||
def payload_as_body(self, payload, **kw):
|
||||
msg = self._make_message()
|
||||
msg['From'] = 'foo'
|
||||
msg['To'] = 'bar'
|
||||
msg['Subject'] = 'payload round trip test'
|
||||
msg.set_content(payload, **kw)
|
||||
b = bytes(msg)
|
||||
msg2 = message_from_bytes(b, policy=self.policy)
|
||||
self.assertEqual(bytes(msg2), b)
|
||||
self.assertEqual(msg2.get_content(), payload)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
1013
bin/python/python-3.13/Lib/test/test_email/test_message.py
Normal file
1013
bin/python/python-3.13/Lib/test/test_email/test_message.py
Normal file
File diff suppressed because it is too large
Load Diff
110
bin/python/python-3.13/Lib/test/test_email/test_parser.py
Normal file
110
bin/python/python-3.13/Lib/test/test_email/test_parser.py
Normal file
@@ -0,0 +1,110 @@
|
||||
import io
|
||||
import email
|
||||
import unittest
|
||||
from email.message import Message, EmailMessage
|
||||
from email.policy import default
|
||||
from test.test_email import TestEmailBase
|
||||
|
||||
|
||||
class TestCustomMessage(TestEmailBase):
|
||||
|
||||
class MyMessage(Message):
|
||||
def __init__(self, policy):
|
||||
self.check_policy = policy
|
||||
super().__init__()
|
||||
|
||||
MyPolicy = TestEmailBase.policy.clone(linesep='boo')
|
||||
|
||||
def test_custom_message_gets_policy_if_possible_from_string(self):
|
||||
msg = email.message_from_string("Subject: bogus\n\nmsg\n",
|
||||
self.MyMessage,
|
||||
policy=self.MyPolicy)
|
||||
self.assertIsInstance(msg, self.MyMessage)
|
||||
self.assertIs(msg.check_policy, self.MyPolicy)
|
||||
|
||||
def test_custom_message_gets_policy_if_possible_from_file(self):
|
||||
source_file = io.StringIO("Subject: bogus\n\nmsg\n")
|
||||
msg = email.message_from_file(source_file,
|
||||
self.MyMessage,
|
||||
policy=self.MyPolicy)
|
||||
self.assertIsInstance(msg, self.MyMessage)
|
||||
self.assertIs(msg.check_policy, self.MyPolicy)
|
||||
|
||||
# XXX add tests for other functions that take Message arg.
|
||||
|
||||
|
||||
class TestParserBase:
|
||||
|
||||
def test_only_split_on_cr_lf(self):
|
||||
# The unicode line splitter splits on unicode linebreaks, which are
|
||||
# more numerous than allowed by the email RFCs; make sure we are only
|
||||
# splitting on those two.
|
||||
for parser in self.parsers:
|
||||
with self.subTest(parser=parser.__name__):
|
||||
msg = parser(
|
||||
"Next-Line: not\x85broken\r\n"
|
||||
"Null: not\x00broken\r\n"
|
||||
"Vertical-Tab: not\vbroken\r\n"
|
||||
"Form-Feed: not\fbroken\r\n"
|
||||
"File-Separator: not\x1Cbroken\r\n"
|
||||
"Group-Separator: not\x1Dbroken\r\n"
|
||||
"Record-Separator: not\x1Ebroken\r\n"
|
||||
"Line-Separator: not\u2028broken\r\n"
|
||||
"Paragraph-Separator: not\u2029broken\r\n"
|
||||
"\r\n",
|
||||
policy=default,
|
||||
)
|
||||
self.assertEqual(msg.items(), [
|
||||
("Next-Line", "not\x85broken"),
|
||||
("Null", "not\x00broken"),
|
||||
("Vertical-Tab", "not\vbroken"),
|
||||
("Form-Feed", "not\fbroken"),
|
||||
("File-Separator", "not\x1Cbroken"),
|
||||
("Group-Separator", "not\x1Dbroken"),
|
||||
("Record-Separator", "not\x1Ebroken"),
|
||||
("Line-Separator", "not\u2028broken"),
|
||||
("Paragraph-Separator", "not\u2029broken"),
|
||||
])
|
||||
self.assertEqual(msg.get_payload(), "")
|
||||
|
||||
class MyMessage(EmailMessage):
|
||||
pass
|
||||
|
||||
def test_custom_message_factory_on_policy(self):
|
||||
for parser in self.parsers:
|
||||
with self.subTest(parser=parser.__name__):
|
||||
MyPolicy = default.clone(message_factory=self.MyMessage)
|
||||
msg = parser("To: foo\n\ntest", policy=MyPolicy)
|
||||
self.assertIsInstance(msg, self.MyMessage)
|
||||
|
||||
def test_factory_arg_overrides_policy(self):
|
||||
for parser in self.parsers:
|
||||
with self.subTest(parser=parser.__name__):
|
||||
MyPolicy = default.clone(message_factory=self.MyMessage)
|
||||
msg = parser("To: foo\n\ntest", Message, policy=MyPolicy)
|
||||
self.assertNotIsInstance(msg, self.MyMessage)
|
||||
self.assertIsInstance(msg, Message)
|
||||
|
||||
# Play some games to get nice output in subTest. This code could be clearer
|
||||
# if staticmethod supported __name__.
|
||||
|
||||
def message_from_file(s, *args, **kw):
|
||||
f = io.StringIO(s)
|
||||
return email.message_from_file(f, *args, **kw)
|
||||
|
||||
class TestParser(TestParserBase, TestEmailBase):
|
||||
parsers = (email.message_from_string, message_from_file)
|
||||
|
||||
def message_from_bytes(s, *args, **kw):
|
||||
return email.message_from_bytes(s.encode(), *args, **kw)
|
||||
|
||||
def message_from_binary_file(s, *args, **kw):
|
||||
f = io.BytesIO(s.encode())
|
||||
return email.message_from_binary_file(f, *args, **kw)
|
||||
|
||||
class TestBytesParser(TestParserBase, TestEmailBase):
|
||||
parsers = (message_from_bytes, message_from_binary_file)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -0,0 +1,76 @@
|
||||
import unittest
|
||||
import textwrap
|
||||
import copy
|
||||
import pickle
|
||||
import email
|
||||
import email.message
|
||||
from email import policy
|
||||
from email.headerregistry import HeaderRegistry
|
||||
from test.test_email import TestEmailBase, parameterize
|
||||
|
||||
|
||||
@parameterize
|
||||
class TestPickleCopyHeader(TestEmailBase):
|
||||
|
||||
header_factory = HeaderRegistry()
|
||||
|
||||
unstructured = header_factory('subject', 'this is a test')
|
||||
|
||||
header_params = {
|
||||
'subject': ('subject', 'this is a test'),
|
||||
'from': ('from', 'frodo@mordor.net'),
|
||||
'to': ('to', 'a: k@b.com, y@z.com;, j@f.com'),
|
||||
'date': ('date', 'Tue, 29 May 2012 09:24:26 +1000'),
|
||||
}
|
||||
|
||||
def header_as_deepcopy(self, name, value):
|
||||
header = self.header_factory(name, value)
|
||||
h = copy.deepcopy(header)
|
||||
self.assertEqual(str(h), str(header))
|
||||
|
||||
def header_as_pickle(self, name, value):
|
||||
header = self.header_factory(name, value)
|
||||
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
|
||||
p = pickle.dumps(header, proto)
|
||||
h = pickle.loads(p)
|
||||
self.assertEqual(str(h), str(header))
|
||||
|
||||
|
||||
@parameterize
|
||||
class TestPickleCopyMessage(TestEmailBase):
|
||||
|
||||
# Message objects are a sequence, so we have to make them a one-tuple in
|
||||
# msg_params so they get passed to the parameterized test method as a
|
||||
# single argument instead of as a list of headers.
|
||||
msg_params = {}
|
||||
|
||||
# Note: there will be no custom header objects in the parsed message.
|
||||
msg_params['parsed'] = (email.message_from_string(textwrap.dedent("""\
|
||||
Date: Tue, 29 May 2012 09:24:26 +1000
|
||||
From: frodo@mordor.net
|
||||
To: bilbo@underhill.org
|
||||
Subject: help
|
||||
|
||||
I think I forgot the ring.
|
||||
"""), policy=policy.default),)
|
||||
|
||||
msg_params['created'] = (email.message.Message(policy=policy.default),)
|
||||
msg_params['created'][0]['Date'] = 'Tue, 29 May 2012 09:24:26 +1000'
|
||||
msg_params['created'][0]['From'] = 'frodo@mordor.net'
|
||||
msg_params['created'][0]['To'] = 'bilbo@underhill.org'
|
||||
msg_params['created'][0]['Subject'] = 'help'
|
||||
msg_params['created'][0].set_payload('I think I forgot the ring.')
|
||||
|
||||
def msg_as_deepcopy(self, msg):
|
||||
msg2 = copy.deepcopy(msg)
|
||||
self.assertEqual(msg2.as_string(), msg.as_string())
|
||||
|
||||
def msg_as_pickle(self, msg):
|
||||
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
|
||||
p = pickle.dumps(msg, proto)
|
||||
msg2 = pickle.loads(p)
|
||||
self.assertEqual(msg2.as_string(), msg.as_string())
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
429
bin/python/python-3.13/Lib/test/test_email/test_policy.py
Normal file
429
bin/python/python-3.13/Lib/test/test_email/test_policy.py
Normal file
@@ -0,0 +1,429 @@
|
||||
import io
|
||||
import types
|
||||
import textwrap
|
||||
import unittest
|
||||
import email.errors
|
||||
import email.policy
|
||||
import email.parser
|
||||
import email.generator
|
||||
import email.message
|
||||
from email import headerregistry
|
||||
|
||||
def make_defaults(base_defaults, differences):
|
||||
defaults = base_defaults.copy()
|
||||
defaults.update(differences)
|
||||
return defaults
|
||||
|
||||
class PolicyAPITests(unittest.TestCase):
|
||||
|
||||
longMessage = True
|
||||
|
||||
# Base default values.
|
||||
compat32_defaults = {
|
||||
'max_line_length': 78,
|
||||
'linesep': '\n',
|
||||
'cte_type': '8bit',
|
||||
'raise_on_defect': False,
|
||||
'mangle_from_': True,
|
||||
'message_factory': None,
|
||||
'verify_generated_headers': True,
|
||||
}
|
||||
# These default values are the ones set on email.policy.default.
|
||||
# If any of these defaults change, the docs must be updated.
|
||||
policy_defaults = compat32_defaults.copy()
|
||||
policy_defaults.update({
|
||||
'utf8': False,
|
||||
'raise_on_defect': False,
|
||||
'header_factory': email.policy.EmailPolicy.header_factory,
|
||||
'refold_source': 'long',
|
||||
'content_manager': email.policy.EmailPolicy.content_manager,
|
||||
'mangle_from_': False,
|
||||
'message_factory': email.message.EmailMessage,
|
||||
})
|
||||
|
||||
# For each policy under test, we give here what we expect the defaults to
|
||||
# be for that policy. The second argument to make defaults is the
|
||||
# difference between the base defaults and that for the particular policy.
|
||||
new_policy = email.policy.EmailPolicy()
|
||||
policies = {
|
||||
email.policy.compat32: make_defaults(compat32_defaults, {}),
|
||||
email.policy.default: make_defaults(policy_defaults, {}),
|
||||
email.policy.SMTP: make_defaults(policy_defaults,
|
||||
{'linesep': '\r\n'}),
|
||||
email.policy.SMTPUTF8: make_defaults(policy_defaults,
|
||||
{'linesep': '\r\n',
|
||||
'utf8': True}),
|
||||
email.policy.HTTP: make_defaults(policy_defaults,
|
||||
{'linesep': '\r\n',
|
||||
'max_line_length': None}),
|
||||
email.policy.strict: make_defaults(policy_defaults,
|
||||
{'raise_on_defect': True}),
|
||||
new_policy: make_defaults(policy_defaults, {}),
|
||||
}
|
||||
# Creating a new policy creates a new header factory. There is a test
|
||||
# later that proves this.
|
||||
policies[new_policy]['header_factory'] = new_policy.header_factory
|
||||
|
||||
def test_defaults(self):
|
||||
for policy, expected in self.policies.items():
|
||||
for attr, value in expected.items():
|
||||
with self.subTest(policy=policy, attr=attr):
|
||||
self.assertEqual(getattr(policy, attr), value,
|
||||
("change {} docs/docstrings if defaults have "
|
||||
"changed").format(policy))
|
||||
|
||||
def test_all_attributes_covered(self):
|
||||
for policy, expected in self.policies.items():
|
||||
for attr in dir(policy):
|
||||
with self.subTest(policy=policy, attr=attr):
|
||||
if (attr.startswith('_') or
|
||||
isinstance(getattr(email.policy.EmailPolicy, attr),
|
||||
types.FunctionType)):
|
||||
continue
|
||||
else:
|
||||
self.assertIn(attr, expected,
|
||||
"{} is not fully tested".format(attr))
|
||||
|
||||
def test_abc(self):
|
||||
with self.assertRaises(TypeError) as cm:
|
||||
email.policy.Policy()
|
||||
msg = str(cm.exception)
|
||||
abstract_methods = ('fold',
|
||||
'fold_binary',
|
||||
'header_fetch_parse',
|
||||
'header_source_parse',
|
||||
'header_store_parse')
|
||||
for method in abstract_methods:
|
||||
self.assertIn(method, msg)
|
||||
|
||||
def test_policy_is_immutable(self):
|
||||
for policy, defaults in self.policies.items():
|
||||
for attr in defaults:
|
||||
with self.assertRaisesRegex(AttributeError, attr+".*read-only"):
|
||||
setattr(policy, attr, None)
|
||||
with self.assertRaisesRegex(AttributeError, 'no attribute.*foo'):
|
||||
policy.foo = None
|
||||
|
||||
def test_set_policy_attrs_when_cloned(self):
|
||||
# None of the attributes has a default value of None, so we set them
|
||||
# all to None in the clone call and check that it worked.
|
||||
for policyclass, defaults in self.policies.items():
|
||||
testattrdict = {attr: None for attr in defaults}
|
||||
policy = policyclass.clone(**testattrdict)
|
||||
for attr in defaults:
|
||||
self.assertIsNone(getattr(policy, attr))
|
||||
|
||||
def test_reject_non_policy_keyword_when_called(self):
|
||||
for policyclass in self.policies:
|
||||
with self.assertRaises(TypeError):
|
||||
policyclass(this_keyword_should_not_be_valid=None)
|
||||
with self.assertRaises(TypeError):
|
||||
policyclass(newtline=None)
|
||||
|
||||
def test_policy_addition(self):
|
||||
expected = self.policy_defaults.copy()
|
||||
p1 = email.policy.default.clone(max_line_length=100)
|
||||
p2 = email.policy.default.clone(max_line_length=50)
|
||||
added = p1 + p2
|
||||
expected.update(max_line_length=50)
|
||||
for attr, value in expected.items():
|
||||
self.assertEqual(getattr(added, attr), value)
|
||||
added = p2 + p1
|
||||
expected.update(max_line_length=100)
|
||||
for attr, value in expected.items():
|
||||
self.assertEqual(getattr(added, attr), value)
|
||||
added = added + email.policy.default
|
||||
for attr, value in expected.items():
|
||||
self.assertEqual(getattr(added, attr), value)
|
||||
|
||||
def test_fold_utf8(self):
|
||||
expected_ascii = 'Subject: =?utf-8?q?=C3=A1?=\n'
|
||||
expected_utf8 = 'Subject: á\n'
|
||||
|
||||
msg = email.message.EmailMessage()
|
||||
s = 'á'
|
||||
msg['Subject'] = s
|
||||
|
||||
p_ascii = email.policy.default.clone()
|
||||
p_utf8 = email.policy.default.clone(utf8=True)
|
||||
|
||||
self.assertEqual(p_ascii.fold('Subject', msg['Subject']), expected_ascii)
|
||||
self.assertEqual(p_utf8.fold('Subject', msg['Subject']), expected_utf8)
|
||||
|
||||
self.assertEqual(p_ascii.fold('Subject', s), expected_ascii)
|
||||
self.assertEqual(p_utf8.fold('Subject', s), expected_utf8)
|
||||
|
||||
def test_fold_zero_max_line_length(self):
|
||||
expected = 'Subject: =?utf-8?q?=C3=A1?=\n'
|
||||
|
||||
msg = email.message.EmailMessage()
|
||||
msg['Subject'] = 'á'
|
||||
|
||||
p1 = email.policy.default.clone(max_line_length=0)
|
||||
p2 = email.policy.default.clone(max_line_length=None)
|
||||
|
||||
self.assertEqual(p1.fold('Subject', msg['Subject']), expected)
|
||||
self.assertEqual(p2.fold('Subject', msg['Subject']), expected)
|
||||
|
||||
def test_register_defect(self):
|
||||
class Dummy:
|
||||
def __init__(self):
|
||||
self.defects = []
|
||||
obj = Dummy()
|
||||
defect = object()
|
||||
policy = email.policy.EmailPolicy()
|
||||
policy.register_defect(obj, defect)
|
||||
self.assertEqual(obj.defects, [defect])
|
||||
defect2 = object()
|
||||
policy.register_defect(obj, defect2)
|
||||
self.assertEqual(obj.defects, [defect, defect2])
|
||||
|
||||
class MyObj:
|
||||
def __init__(self):
|
||||
self.defects = []
|
||||
|
||||
class MyDefect(Exception):
|
||||
pass
|
||||
|
||||
def test_handle_defect_raises_on_strict(self):
|
||||
foo = self.MyObj()
|
||||
defect = self.MyDefect("the telly is broken")
|
||||
with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
|
||||
email.policy.strict.handle_defect(foo, defect)
|
||||
|
||||
def test_handle_defect_registers_defect(self):
|
||||
foo = self.MyObj()
|
||||
defect1 = self.MyDefect("one")
|
||||
email.policy.default.handle_defect(foo, defect1)
|
||||
self.assertEqual(foo.defects, [defect1])
|
||||
defect2 = self.MyDefect("two")
|
||||
email.policy.default.handle_defect(foo, defect2)
|
||||
self.assertEqual(foo.defects, [defect1, defect2])
|
||||
|
||||
class MyPolicy(email.policy.EmailPolicy):
|
||||
defects = None
|
||||
def __init__(self, *args, **kw):
|
||||
super().__init__(*args, defects=[], **kw)
|
||||
def register_defect(self, obj, defect):
|
||||
self.defects.append(defect)
|
||||
|
||||
def test_overridden_register_defect_still_raises(self):
|
||||
foo = self.MyObj()
|
||||
defect = self.MyDefect("the telly is broken")
|
||||
with self.assertRaisesRegex(self.MyDefect, "the telly is broken"):
|
||||
self.MyPolicy(raise_on_defect=True).handle_defect(foo, defect)
|
||||
|
||||
def test_overridden_register_defect_works(self):
|
||||
foo = self.MyObj()
|
||||
defect1 = self.MyDefect("one")
|
||||
my_policy = self.MyPolicy()
|
||||
my_policy.handle_defect(foo, defect1)
|
||||
self.assertEqual(my_policy.defects, [defect1])
|
||||
self.assertEqual(foo.defects, [])
|
||||
defect2 = self.MyDefect("two")
|
||||
my_policy.handle_defect(foo, defect2)
|
||||
self.assertEqual(my_policy.defects, [defect1, defect2])
|
||||
self.assertEqual(foo.defects, [])
|
||||
|
||||
def test_default_header_factory(self):
|
||||
h = email.policy.default.header_factory('Test', 'test')
|
||||
self.assertEqual(h.name, 'Test')
|
||||
self.assertIsInstance(h, headerregistry.UnstructuredHeader)
|
||||
self.assertIsInstance(h, headerregistry.BaseHeader)
|
||||
|
||||
class Foo:
|
||||
parse = headerregistry.UnstructuredHeader.parse
|
||||
|
||||
def test_each_Policy_gets_unique_factory(self):
|
||||
policy1 = email.policy.EmailPolicy()
|
||||
policy2 = email.policy.EmailPolicy()
|
||||
policy1.header_factory.map_to_type('foo', self.Foo)
|
||||
h = policy1.header_factory('foo', 'test')
|
||||
self.assertIsInstance(h, self.Foo)
|
||||
self.assertNotIsInstance(h, headerregistry.UnstructuredHeader)
|
||||
h = policy2.header_factory('foo', 'test')
|
||||
self.assertNotIsInstance(h, self.Foo)
|
||||
self.assertIsInstance(h, headerregistry.UnstructuredHeader)
|
||||
|
||||
def test_clone_copies_factory(self):
|
||||
policy1 = email.policy.EmailPolicy()
|
||||
policy2 = policy1.clone()
|
||||
policy1.header_factory.map_to_type('foo', self.Foo)
|
||||
h = policy1.header_factory('foo', 'test')
|
||||
self.assertIsInstance(h, self.Foo)
|
||||
h = policy2.header_factory('foo', 'test')
|
||||
self.assertIsInstance(h, self.Foo)
|
||||
|
||||
def test_new_factory_overrides_default(self):
|
||||
mypolicy = email.policy.EmailPolicy()
|
||||
myfactory = mypolicy.header_factory
|
||||
newpolicy = mypolicy + email.policy.strict
|
||||
self.assertEqual(newpolicy.header_factory, myfactory)
|
||||
newpolicy = email.policy.strict + mypolicy
|
||||
self.assertEqual(newpolicy.header_factory, myfactory)
|
||||
|
||||
def test_adding_default_policies_preserves_default_factory(self):
|
||||
newpolicy = email.policy.default + email.policy.strict
|
||||
self.assertEqual(newpolicy.header_factory,
|
||||
email.policy.EmailPolicy.header_factory)
|
||||
self.assertEqual(newpolicy.__dict__, {'raise_on_defect': True})
|
||||
|
||||
def test_non_ascii_chars_do_not_cause_inf_loop(self):
|
||||
policy = email.policy.default.clone(max_line_length=20)
|
||||
actual = policy.fold('Subject', 'ą' * 12)
|
||||
self.assertEqual(
|
||||
actual,
|
||||
'Subject: \n' +
|
||||
12 * ' =?utf-8?q?=C4=85?=\n')
|
||||
|
||||
def test_short_maxlen_error(self):
|
||||
# RFC 2047 chrome takes up 7 characters, plus the length of the charset
|
||||
# name, so folding should fail if maxlen is lower than the minimum
|
||||
# required length for a line.
|
||||
|
||||
# Note: This is only triggered when there is a single word longer than
|
||||
# max_line_length, hence the 1234567890 at the end of this whimsical
|
||||
# subject. This is because when we encounter a word longer than
|
||||
# max_line_length, it is broken down into encoded words to fit
|
||||
# max_line_length. If the max_line_length isn't large enough to even
|
||||
# contain the RFC 2047 chrome (`?=<charset>?q??=`), we fail.
|
||||
subject = "Melt away the pounds with this one simple trick! 1234567890"
|
||||
|
||||
for maxlen in [3, 7, 9]:
|
||||
with self.subTest(maxlen=maxlen):
|
||||
policy = email.policy.default.clone(max_line_length=maxlen)
|
||||
with self.assertRaises(email.errors.HeaderParseError):
|
||||
policy.fold("Subject", subject)
|
||||
|
||||
def test_verify_generated_headers(self):
|
||||
"""Turning protection off allows header injection"""
|
||||
policy = email.policy.default.clone(verify_generated_headers=False)
|
||||
for text in (
|
||||
'Header: Value\r\nBad: Injection\r\n',
|
||||
'Header: NoNewLine'
|
||||
):
|
||||
with self.subTest(text=text):
|
||||
message = email.message_from_string(
|
||||
"Header: Value\r\n\r\nBody",
|
||||
policy=policy,
|
||||
)
|
||||
class LiteralHeader(str):
|
||||
name = 'Header'
|
||||
def fold(self, **kwargs):
|
||||
return self
|
||||
|
||||
del message['Header']
|
||||
message['Header'] = LiteralHeader(text)
|
||||
|
||||
self.assertEqual(
|
||||
message.as_string(),
|
||||
f"{text}\nBody",
|
||||
)
|
||||
|
||||
# XXX: Need subclassing tests.
|
||||
# For adding subclassed objects, make sure the usual rules apply (subclass
|
||||
# wins), but that the order still works (right overrides left).
|
||||
|
||||
|
||||
class TestException(Exception):
|
||||
pass
|
||||
|
||||
class TestPolicyPropagation(unittest.TestCase):
|
||||
|
||||
# The abstract methods are used by the parser but not by the wrapper
|
||||
# functions that call it, so if the exception gets raised we know that the
|
||||
# policy was actually propagated all the way to feedparser.
|
||||
class MyPolicy(email.policy.Policy):
|
||||
def badmethod(self, *args, **kw):
|
||||
raise TestException("test")
|
||||
fold = fold_binary = header_fetch_parser = badmethod
|
||||
header_source_parse = header_store_parse = badmethod
|
||||
|
||||
def test_message_from_string(self):
|
||||
with self.assertRaisesRegex(TestException, "^test$"):
|
||||
email.message_from_string("Subject: test\n\n",
|
||||
policy=self.MyPolicy)
|
||||
|
||||
def test_message_from_bytes(self):
|
||||
with self.assertRaisesRegex(TestException, "^test$"):
|
||||
email.message_from_bytes(b"Subject: test\n\n",
|
||||
policy=self.MyPolicy)
|
||||
|
||||
def test_message_from_file(self):
|
||||
f = io.StringIO('Subject: test\n\n')
|
||||
with self.assertRaisesRegex(TestException, "^test$"):
|
||||
email.message_from_file(f, policy=self.MyPolicy)
|
||||
|
||||
def test_message_from_binary_file(self):
|
||||
f = io.BytesIO(b'Subject: test\n\n')
|
||||
with self.assertRaisesRegex(TestException, "^test$"):
|
||||
email.message_from_binary_file(f, policy=self.MyPolicy)
|
||||
|
||||
# These are redundant, but we need them for black-box completeness.
|
||||
|
||||
def test_parser(self):
|
||||
p = email.parser.Parser(policy=self.MyPolicy)
|
||||
with self.assertRaisesRegex(TestException, "^test$"):
|
||||
p.parsestr('Subject: test\n\n')
|
||||
|
||||
def test_bytes_parser(self):
|
||||
p = email.parser.BytesParser(policy=self.MyPolicy)
|
||||
with self.assertRaisesRegex(TestException, "^test$"):
|
||||
p.parsebytes(b'Subject: test\n\n')
|
||||
|
||||
# Now that we've established that all the parse methods get the
|
||||
# policy in to feedparser, we can use message_from_string for
|
||||
# the rest of the propagation tests.
|
||||
|
||||
def _make_msg(self, source='Subject: test\n\n', policy=None):
|
||||
self.policy = email.policy.default.clone() if policy is None else policy
|
||||
return email.message_from_string(source, policy=self.policy)
|
||||
|
||||
def test_parser_propagates_policy_to_message(self):
|
||||
msg = self._make_msg()
|
||||
self.assertIs(msg.policy, self.policy)
|
||||
|
||||
def test_parser_propagates_policy_to_sub_messages(self):
|
||||
msg = self._make_msg(textwrap.dedent("""\
|
||||
Subject: mime test
|
||||
MIME-Version: 1.0
|
||||
Content-Type: multipart/mixed, boundary="XXX"
|
||||
|
||||
--XXX
|
||||
Content-Type: text/plain
|
||||
|
||||
test
|
||||
--XXX
|
||||
Content-Type: text/plain
|
||||
|
||||
test2
|
||||
--XXX--
|
||||
"""))
|
||||
for part in msg.walk():
|
||||
self.assertIs(part.policy, self.policy)
|
||||
|
||||
def test_message_policy_propagates_to_generator(self):
|
||||
msg = self._make_msg("Subject: test\nTo: foo\n\n",
|
||||
policy=email.policy.default.clone(linesep='X'))
|
||||
s = io.StringIO()
|
||||
g = email.generator.Generator(s)
|
||||
g.flatten(msg)
|
||||
self.assertEqual(s.getvalue(), "Subject: testXTo: fooXX")
|
||||
|
||||
def test_message_policy_used_by_as_string(self):
|
||||
msg = self._make_msg("Subject: test\nTo: foo\n\n",
|
||||
policy=email.policy.default.clone(linesep='X'))
|
||||
self.assertEqual(msg.as_string(), "Subject: testXTo: fooXX")
|
||||
|
||||
|
||||
class TestConcretePolicies(unittest.TestCase):
|
||||
|
||||
def test_header_store_parse_rejects_newlines(self):
|
||||
instance = email.policy.EmailPolicy()
|
||||
self.assertRaises(ValueError,
|
||||
instance.header_store_parse,
|
||||
'From', 'spam\negg@foo.py')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
186
bin/python/python-3.13/Lib/test/test_email/test_utils.py
Normal file
186
bin/python/python-3.13/Lib/test/test_email/test_utils.py
Normal file
@@ -0,0 +1,186 @@
|
||||
import datetime
|
||||
from email import utils
|
||||
import test.support
|
||||
import time
|
||||
import unittest
|
||||
import sys
|
||||
import os.path
|
||||
import zoneinfo
|
||||
|
||||
class DateTimeTests(unittest.TestCase):
|
||||
|
||||
datestring = 'Sun, 23 Sep 2001 20:10:55'
|
||||
dateargs = (2001, 9, 23, 20, 10, 55)
|
||||
offsetstring = ' -0700'
|
||||
utcoffset = datetime.timedelta(hours=-7)
|
||||
tz = datetime.timezone(utcoffset)
|
||||
naive_dt = datetime.datetime(*dateargs)
|
||||
aware_dt = datetime.datetime(*dateargs, tzinfo=tz)
|
||||
|
||||
def test_naive_datetime(self):
|
||||
self.assertEqual(utils.format_datetime(self.naive_dt),
|
||||
self.datestring + ' -0000')
|
||||
|
||||
def test_aware_datetime(self):
|
||||
self.assertEqual(utils.format_datetime(self.aware_dt),
|
||||
self.datestring + self.offsetstring)
|
||||
|
||||
def test_usegmt(self):
|
||||
utc_dt = datetime.datetime(*self.dateargs,
|
||||
tzinfo=datetime.timezone.utc)
|
||||
self.assertEqual(utils.format_datetime(utc_dt, usegmt=True),
|
||||
self.datestring + ' GMT')
|
||||
|
||||
def test_usegmt_with_naive_datetime_raises(self):
|
||||
with self.assertRaises(ValueError):
|
||||
utils.format_datetime(self.naive_dt, usegmt=True)
|
||||
|
||||
def test_usegmt_with_non_utc_datetime_raises(self):
|
||||
with self.assertRaises(ValueError):
|
||||
utils.format_datetime(self.aware_dt, usegmt=True)
|
||||
|
||||
def test_parsedate_to_datetime(self):
|
||||
self.assertEqual(
|
||||
utils.parsedate_to_datetime(self.datestring + self.offsetstring),
|
||||
self.aware_dt)
|
||||
|
||||
def test_parsedate_to_datetime_naive(self):
|
||||
self.assertEqual(
|
||||
utils.parsedate_to_datetime(self.datestring + ' -0000'),
|
||||
self.naive_dt)
|
||||
|
||||
def test_parsedate_to_datetime_with_invalid_raises_valueerror(self):
|
||||
# See also test_parsedate_returns_None_for_invalid_strings in test_email.
|
||||
invalid_dates = [
|
||||
'',
|
||||
' ',
|
||||
'0',
|
||||
'A Complete Waste of Time',
|
||||
'Wed, 3 Apr 2002 12.34.56.78+0800'
|
||||
'Tue, 06 Jun 2017 27:39:33 +0600',
|
||||
'Tue, 06 Jun 2017 07:39:33 +2600',
|
||||
'Tue, 06 Jun 2017 27:39:33',
|
||||
'17 June , 2022',
|
||||
'Friday, -Nov-82 16:14:55 EST',
|
||||
'Friday, Nov--82 16:14:55 EST',
|
||||
'Friday, 19-Nov- 16:14:55 EST',
|
||||
]
|
||||
for dtstr in invalid_dates:
|
||||
with self.subTest(dtstr=dtstr):
|
||||
self.assertRaises(ValueError, utils.parsedate_to_datetime, dtstr)
|
||||
|
||||
class LocaltimeTests(unittest.TestCase):
|
||||
|
||||
def test_localtime_is_tz_aware_daylight_true(self):
|
||||
test.support.patch(self, time, 'daylight', True)
|
||||
t = utils.localtime()
|
||||
self.assertIsNotNone(t.tzinfo)
|
||||
|
||||
def test_localtime_is_tz_aware_daylight_false(self):
|
||||
test.support.patch(self, time, 'daylight', False)
|
||||
t = utils.localtime()
|
||||
self.assertIsNotNone(t.tzinfo)
|
||||
|
||||
def test_localtime_daylight_true_dst_false(self):
|
||||
test.support.patch(self, time, 'daylight', True)
|
||||
t0 = datetime.datetime(2012, 3, 12, 1, 1)
|
||||
t1 = utils.localtime(t0)
|
||||
t2 = utils.localtime(t1)
|
||||
self.assertEqual(t1, t2)
|
||||
|
||||
def test_localtime_daylight_false_dst_false(self):
|
||||
test.support.patch(self, time, 'daylight', False)
|
||||
t0 = datetime.datetime(2012, 3, 12, 1, 1)
|
||||
t1 = utils.localtime(t0)
|
||||
t2 = utils.localtime(t1)
|
||||
self.assertEqual(t1, t2)
|
||||
|
||||
@test.support.run_with_tz('Europe/Minsk')
|
||||
def test_localtime_daylight_true_dst_true(self):
|
||||
test.support.patch(self, time, 'daylight', True)
|
||||
t0 = datetime.datetime(2012, 3, 12, 1, 1)
|
||||
t1 = utils.localtime(t0)
|
||||
t2 = utils.localtime(t1)
|
||||
self.assertEqual(t1, t2)
|
||||
|
||||
@test.support.run_with_tz('Europe/Minsk')
|
||||
def test_localtime_daylight_false_dst_true(self):
|
||||
test.support.patch(self, time, 'daylight', False)
|
||||
t0 = datetime.datetime(2012, 3, 12, 1, 1)
|
||||
t1 = utils.localtime(t0)
|
||||
t2 = utils.localtime(t1)
|
||||
self.assertEqual(t1, t2)
|
||||
|
||||
@test.support.run_with_tz('EST+05EDT,M3.2.0,M11.1.0')
|
||||
def test_localtime_epoch_utc_daylight_true(self):
|
||||
test.support.patch(self, time, 'daylight', True)
|
||||
t0 = datetime.datetime(1990, 1, 1, tzinfo = datetime.timezone.utc)
|
||||
t1 = utils.localtime(t0)
|
||||
t2 = t0 - datetime.timedelta(hours=5)
|
||||
t2 = t2.replace(tzinfo = datetime.timezone(datetime.timedelta(hours=-5)))
|
||||
self.assertEqual(t1, t2)
|
||||
|
||||
@test.support.run_with_tz('EST+05EDT,M3.2.0,M11.1.0')
|
||||
def test_localtime_epoch_utc_daylight_false(self):
|
||||
test.support.patch(self, time, 'daylight', False)
|
||||
t0 = datetime.datetime(1990, 1, 1, tzinfo = datetime.timezone.utc)
|
||||
t1 = utils.localtime(t0)
|
||||
t2 = t0 - datetime.timedelta(hours=5)
|
||||
t2 = t2.replace(tzinfo = datetime.timezone(datetime.timedelta(hours=-5)))
|
||||
self.assertEqual(t1, t2)
|
||||
|
||||
def test_localtime_epoch_notz_daylight_true(self):
|
||||
test.support.patch(self, time, 'daylight', True)
|
||||
t0 = datetime.datetime(1990, 1, 1)
|
||||
t1 = utils.localtime(t0)
|
||||
t2 = utils.localtime(t0.replace(tzinfo=None))
|
||||
self.assertEqual(t1, t2)
|
||||
|
||||
def test_localtime_epoch_notz_daylight_false(self):
|
||||
test.support.patch(self, time, 'daylight', False)
|
||||
t0 = datetime.datetime(1990, 1, 1)
|
||||
t1 = utils.localtime(t0)
|
||||
t2 = utils.localtime(t0.replace(tzinfo=None))
|
||||
self.assertEqual(t1, t2)
|
||||
|
||||
@test.support.run_with_tz('Europe/Kyiv')
|
||||
def test_variable_tzname(self):
|
||||
t0 = datetime.datetime(1984, 1, 1, tzinfo=datetime.timezone.utc)
|
||||
t1 = utils.localtime(t0)
|
||||
if t1.tzname() in ('Europe', 'UTC'):
|
||||
self.skipTest("Can't find a Kyiv timezone database")
|
||||
self.assertEqual(t1.tzname(), 'MSK')
|
||||
t0 = datetime.datetime(1994, 1, 1, tzinfo=datetime.timezone.utc)
|
||||
t1 = utils.localtime(t0)
|
||||
self.assertEqual(t1.tzname(), 'EET')
|
||||
|
||||
def test_isdst_deprecation(self):
|
||||
with self.assertWarns(DeprecationWarning):
|
||||
t0 = datetime.datetime(1990, 1, 1)
|
||||
t1 = utils.localtime(t0, isdst=True)
|
||||
|
||||
# Issue #24836: The timezone files are out of date (pre 2011k)
|
||||
# on Mac OS X Snow Leopard.
|
||||
@test.support.requires_mac_ver(10, 7)
|
||||
class FormatDateTests(unittest.TestCase):
|
||||
|
||||
@test.support.run_with_tz('Europe/Minsk')
|
||||
def test_formatdate(self):
|
||||
timeval = time.mktime((2011, 12, 1, 18, 0, 0, 4, 335, 0))
|
||||
string = utils.formatdate(timeval, localtime=False, usegmt=False)
|
||||
self.assertEqual(string, 'Thu, 01 Dec 2011 15:00:00 -0000')
|
||||
string = utils.formatdate(timeval, localtime=False, usegmt=True)
|
||||
self.assertEqual(string, 'Thu, 01 Dec 2011 15:00:00 GMT')
|
||||
|
||||
@test.support.run_with_tz('Europe/Minsk')
|
||||
def test_formatdate_with_localtime(self):
|
||||
timeval = time.mktime((2011, 1, 1, 18, 0, 0, 6, 1, 0))
|
||||
string = utils.formatdate(timeval, localtime=True)
|
||||
self.assertEqual(string, 'Sat, 01 Jan 2011 18:00:00 +0200')
|
||||
# Minsk moved from +0200 (with DST) to +0300 (without DST) in 2011
|
||||
timeval = time.mktime((2011, 12, 1, 18, 0, 0, 4, 335, 0))
|
||||
string = utils.formatdate(timeval, localtime=True)
|
||||
self.assertEqual(string, 'Thu, 01 Dec 2011 18:00:00 +0300')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
127
bin/python/python-3.13/Lib/test/test_email/torture_test.py
Normal file
127
bin/python/python-3.13/Lib/test/test_email/torture_test.py
Normal file
@@ -0,0 +1,127 @@
|
||||
# Copyright (C) 2002-2004 Python Software Foundation
|
||||
#
|
||||
# A torture test of the email package. This should not be run as part of the
|
||||
# standard Python test suite since it requires several meg of email messages
|
||||
# collected in the wild. These source messages are not checked into the
|
||||
# Python distro, but are available as part of the standalone email package at
|
||||
# http://sf.net/projects/mimelib
|
||||
|
||||
import sys
|
||||
import os
|
||||
import unittest
|
||||
from io import StringIO
|
||||
|
||||
from test.test_email import TestEmailBase
|
||||
|
||||
import email
|
||||
from email import __file__ as testfile
|
||||
from email.iterators import _structure
|
||||
|
||||
def openfile(filename):
|
||||
from os.path import join, dirname, abspath
|
||||
path = abspath(join(dirname(testfile), os.pardir, 'moredata', filename))
|
||||
return open(path, 'r')
|
||||
|
||||
# Prevent this test from running in the Python distro
|
||||
def setUpModule():
|
||||
try:
|
||||
openfile('crispin-torture.txt')
|
||||
except OSError:
|
||||
raise unittest.SkipTest
|
||||
|
||||
|
||||
|
||||
class TortureBase(TestEmailBase):
|
||||
def _msgobj(self, filename):
|
||||
fp = openfile(filename)
|
||||
try:
|
||||
msg = email.message_from_file(fp)
|
||||
finally:
|
||||
fp.close()
|
||||
return msg
|
||||
|
||||
|
||||
|
||||
class TestCrispinTorture(TortureBase):
|
||||
# Mark Crispin's torture test from the SquirrelMail project
|
||||
def test_mondo_message(self):
|
||||
eq = self.assertEqual
|
||||
neq = self.ndiffAssertEqual
|
||||
msg = self._msgobj('crispin-torture.txt')
|
||||
payload = msg.get_payload()
|
||||
eq(type(payload), list)
|
||||
eq(len(payload), 12)
|
||||
eq(msg.preamble, None)
|
||||
eq(msg.epilogue, '\n')
|
||||
# Probably the best way to verify the message is parsed correctly is to
|
||||
# dump its structure and compare it against the known structure.
|
||||
fp = StringIO()
|
||||
_structure(msg, fp=fp)
|
||||
neq(fp.getvalue(), """\
|
||||
multipart/mixed
|
||||
text/plain
|
||||
message/rfc822
|
||||
multipart/alternative
|
||||
text/plain
|
||||
multipart/mixed
|
||||
text/richtext
|
||||
application/andrew-inset
|
||||
message/rfc822
|
||||
audio/basic
|
||||
audio/basic
|
||||
image/pbm
|
||||
message/rfc822
|
||||
multipart/mixed
|
||||
multipart/mixed
|
||||
text/plain
|
||||
audio/x-sun
|
||||
multipart/mixed
|
||||
image/gif
|
||||
image/gif
|
||||
application/x-be2
|
||||
application/atomicmail
|
||||
audio/x-sun
|
||||
message/rfc822
|
||||
multipart/mixed
|
||||
text/plain
|
||||
image/pgm
|
||||
text/plain
|
||||
message/rfc822
|
||||
multipart/mixed
|
||||
text/plain
|
||||
image/pbm
|
||||
message/rfc822
|
||||
application/postscript
|
||||
image/gif
|
||||
message/rfc822
|
||||
multipart/mixed
|
||||
audio/basic
|
||||
audio/basic
|
||||
message/rfc822
|
||||
multipart/mixed
|
||||
application/postscript
|
||||
text/plain
|
||||
message/rfc822
|
||||
multipart/mixed
|
||||
text/plain
|
||||
multipart/parallel
|
||||
image/gif
|
||||
audio/basic
|
||||
application/atomicmail
|
||||
message/rfc822
|
||||
audio/x-sun
|
||||
""")
|
||||
|
||||
def _testclasses():
|
||||
mod = sys.modules[__name__]
|
||||
return [getattr(mod, name) for name in dir(mod) if name.startswith('Test')]
|
||||
|
||||
|
||||
def load_tests(loader, tests, pattern):
|
||||
suite = loader.suiteClass()
|
||||
for testclass in _testclasses():
|
||||
suite.addTest(loader.loadTestsFromTestCase(testclass))
|
||||
return suite
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user