base.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. from __future__ import print_function, absolute_import
  2. import os
  3. import tempfile
  4. import unittest
  5. import sys
  6. import re
  7. import warnings
  8. import io
  9. from textwrap import dedent
  10. from future.utils import bind_method, PY26, PY3, PY2, PY27
  11. from future.moves.subprocess import check_output, STDOUT, CalledProcessError
  12. if PY26:
  13. import unittest2 as unittest
  14. def reformat_code(code):
  15. """
  16. Removes any leading \n and dedents.
  17. """
  18. if code.startswith('\n'):
  19. code = code[1:]
  20. return dedent(code)
  21. def order_future_lines(code):
  22. """
  23. Returns the code block with any ``__future__`` import lines sorted, and
  24. then any ``future`` import lines sorted, then any ``builtins`` import lines
  25. sorted.
  26. This only sorts the lines within the expected blocks.
  27. See test_order_future_lines() for an example.
  28. """
  29. # We need .splitlines(keepends=True), which doesn't exist on Py2,
  30. # so we use this instead:
  31. lines = code.split('\n')
  32. uufuture_line_numbers = [i for i, line in enumerate(lines)
  33. if line.startswith('from __future__ import ')]
  34. future_line_numbers = [i for i, line in enumerate(lines)
  35. if line.startswith('from future')
  36. or line.startswith('from past')]
  37. builtins_line_numbers = [i for i, line in enumerate(lines)
  38. if line.startswith('from builtins')]
  39. assert code.lstrip() == code, ('internal usage error: '
  40. 'dedent the code before calling order_future_lines()')
  41. def mymax(numbers):
  42. return max(numbers) if len(numbers) > 0 else 0
  43. def mymin(numbers):
  44. return min(numbers) if len(numbers) > 0 else float('inf')
  45. assert mymax(uufuture_line_numbers) <= mymin(future_line_numbers), \
  46. 'the __future__ and future imports are out of order'
  47. # assert mymax(future_line_numbers) <= mymin(builtins_line_numbers), \
  48. # 'the future and builtins imports are out of order'
  49. uul = sorted([lines[i] for i in uufuture_line_numbers])
  50. sorted_uufuture_lines = dict(zip(uufuture_line_numbers, uul))
  51. fl = sorted([lines[i] for i in future_line_numbers])
  52. sorted_future_lines = dict(zip(future_line_numbers, fl))
  53. bl = sorted([lines[i] for i in builtins_line_numbers])
  54. sorted_builtins_lines = dict(zip(builtins_line_numbers, bl))
  55. # Replace the old unsorted "from __future__ import ..." lines with the
  56. # new sorted ones:
  57. new_lines = []
  58. for i in range(len(lines)):
  59. if i in uufuture_line_numbers:
  60. new_lines.append(sorted_uufuture_lines[i])
  61. elif i in future_line_numbers:
  62. new_lines.append(sorted_future_lines[i])
  63. elif i in builtins_line_numbers:
  64. new_lines.append(sorted_builtins_lines[i])
  65. else:
  66. new_lines.append(lines[i])
  67. return '\n'.join(new_lines)
  68. class VerboseCalledProcessError(CalledProcessError):
  69. """
  70. Like CalledProcessError, but it displays more information (message and
  71. script output) for diagnosing test failures etc.
  72. """
  73. def __init__(self, msg, returncode, cmd, output=None):
  74. self.msg = msg
  75. self.returncode = returncode
  76. self.cmd = cmd
  77. self.output = output
  78. def __str__(self):
  79. return ("Command '%s' failed with exit status %d\nMessage: %s\nOutput: %s"
  80. % (self.cmd, self.returncode, self.msg, self.output))
  81. class FuturizeError(VerboseCalledProcessError):
  82. pass
  83. class PasteurizeError(VerboseCalledProcessError):
  84. pass
  85. class CodeHandler(unittest.TestCase):
  86. """
  87. Handy mixin for test classes for writing / reading / futurizing /
  88. running .py files in the test suite.
  89. """
  90. def setUp(self):
  91. """
  92. The outputs from the various futurize stages should have the
  93. following headers:
  94. """
  95. # After stage1:
  96. # TODO: use this form after implementing a fixer to consolidate
  97. # __future__ imports into a single line:
  98. # self.headers1 = """
  99. # from __future__ import absolute_import, division, print_function
  100. # """
  101. self.headers1 = reformat_code("""
  102. from __future__ import absolute_import
  103. from __future__ import division
  104. from __future__ import print_function
  105. """)
  106. # After stage2 --all-imports:
  107. # TODO: use this form after implementing a fixer to consolidate
  108. # __future__ imports into a single line:
  109. # self.headers2 = """
  110. # from __future__ import (absolute_import, division,
  111. # print_function, unicode_literals)
  112. # from future import standard_library
  113. # from future.builtins import *
  114. # """
  115. self.headers2 = reformat_code("""
  116. from __future__ import absolute_import
  117. from __future__ import division
  118. from __future__ import print_function
  119. from __future__ import unicode_literals
  120. from future import standard_library
  121. standard_library.install_aliases()
  122. from builtins import *
  123. """)
  124. self.interpreters = [sys.executable]
  125. self.tempdir = tempfile.mkdtemp() + os.path.sep
  126. pypath = os.getenv('PYTHONPATH')
  127. if pypath:
  128. self.env = {'PYTHONPATH': os.getcwd() + os.pathsep + pypath}
  129. else:
  130. self.env = {'PYTHONPATH': os.getcwd()}
  131. def convert(self, code, stages=(1, 2), all_imports=False, from3=False,
  132. reformat=True, run=True, conservative=False):
  133. """
  134. Converts the code block using ``futurize`` and returns the
  135. resulting code.
  136. Passing stages=[1] or stages=[2] passes the flag ``--stage1`` or
  137. ``stage2`` to ``futurize``. Passing both stages runs ``futurize``
  138. with both stages by default.
  139. If from3 is False, runs ``futurize``, converting from Python 2 to
  140. both 2 and 3. If from3 is True, runs ``pasteurize`` to convert
  141. from Python 3 to both 2 and 3.
  142. Optionally reformats the code block first using the reformat() function.
  143. If run is True, runs the resulting code under all Python
  144. interpreters in self.interpreters.
  145. """
  146. if reformat:
  147. code = reformat_code(code)
  148. self._write_test_script(code)
  149. self._futurize_test_script(stages=stages, all_imports=all_imports,
  150. from3=from3, conservative=conservative)
  151. output = self._read_test_script()
  152. if run:
  153. for interpreter in self.interpreters:
  154. _ = self._run_test_script(interpreter=interpreter)
  155. return output
  156. def compare(self, output, expected, ignore_imports=True):
  157. """
  158. Compares whether the code blocks are equal. If not, raises an
  159. exception so the test fails. Ignores any trailing whitespace like
  160. blank lines.
  161. If ignore_imports is True, passes the code blocks into the
  162. strip_future_imports method.
  163. If one code block is a unicode string and the other a
  164. byte-string, it assumes the byte-string is encoded as utf-8.
  165. """
  166. if ignore_imports:
  167. output = self.strip_future_imports(output)
  168. expected = self.strip_future_imports(expected)
  169. if isinstance(output, bytes) and not isinstance(expected, bytes):
  170. output = output.decode('utf-8')
  171. if isinstance(expected, bytes) and not isinstance(output, bytes):
  172. expected = expected.decode('utf-8')
  173. self.assertEqual(order_future_lines(output.rstrip()),
  174. expected.rstrip())
  175. def strip_future_imports(self, code):
  176. """
  177. Strips any of these import lines:
  178. from __future__ import <anything>
  179. from future <anything>
  180. from future.<anything>
  181. from builtins <anything>
  182. or any line containing:
  183. install_hooks()
  184. or:
  185. install_aliases()
  186. Limitation: doesn't handle imports split across multiple lines like
  187. this:
  188. from __future__ import (absolute_import, division, print_function,
  189. unicode_literals)
  190. """
  191. output = []
  192. # We need .splitlines(keepends=True), which doesn't exist on Py2,
  193. # so we use this instead:
  194. for line in code.split('\n'):
  195. if not (line.startswith('from __future__ import ')
  196. or line.startswith('from future ')
  197. or line.startswith('from builtins ')
  198. or 'install_hooks()' in line
  199. or 'install_aliases()' in line
  200. # but don't match "from future_builtins" :)
  201. or line.startswith('from future.')):
  202. output.append(line)
  203. return '\n'.join(output)
  204. def convert_check(self, before, expected, stages=(1, 2), all_imports=False,
  205. ignore_imports=True, from3=False, run=True,
  206. conservative=False):
  207. """
  208. Convenience method that calls convert() and compare().
  209. Reformats the code blocks automatically using the reformat_code()
  210. function.
  211. If all_imports is passed, we add the appropriate import headers
  212. for the stage(s) selected to the ``expected`` code-block, so they
  213. needn't appear repeatedly in the test code.
  214. If ignore_imports is True, ignores the presence of any lines
  215. beginning:
  216. from __future__ import ...
  217. from future import ...
  218. for the purpose of the comparison.
  219. """
  220. output = self.convert(before, stages=stages, all_imports=all_imports,
  221. from3=from3, run=run, conservative=conservative)
  222. if all_imports:
  223. headers = self.headers2 if 2 in stages else self.headers1
  224. else:
  225. headers = ''
  226. reformatted = reformat_code(expected)
  227. if headers in reformatted:
  228. headers = ''
  229. self.compare(output, headers + reformatted,
  230. ignore_imports=ignore_imports)
  231. def unchanged(self, code, **kwargs):
  232. """
  233. Convenience method to ensure the code is unchanged by the
  234. futurize process.
  235. """
  236. self.convert_check(code, code, **kwargs)
  237. def _write_test_script(self, code, filename='mytestscript.py'):
  238. """
  239. Dedents the given code (a multiline string) and writes it out to
  240. a file in a temporary folder like /tmp/tmpUDCn7x/mytestscript.py.
  241. """
  242. if isinstance(code, bytes):
  243. code = code.decode('utf-8')
  244. # Be explicit about encoding the temp file as UTF-8 (issue #63):
  245. with io.open(self.tempdir + filename, 'wt', encoding='utf-8') as f:
  246. f.write(dedent(code))
  247. def _read_test_script(self, filename='mytestscript.py'):
  248. with io.open(self.tempdir + filename, 'rt', encoding='utf-8') as f:
  249. newsource = f.read()
  250. return newsource
  251. def _futurize_test_script(self, filename='mytestscript.py', stages=(1, 2),
  252. all_imports=False, from3=False,
  253. conservative=False):
  254. params = []
  255. stages = list(stages)
  256. if all_imports:
  257. params.append('--all-imports')
  258. if from3:
  259. script = 'pasteurize.py'
  260. else:
  261. script = 'futurize.py'
  262. if stages == [1]:
  263. params.append('--stage1')
  264. elif stages == [2]:
  265. params.append('--stage2')
  266. else:
  267. assert stages == [1, 2]
  268. if conservative:
  269. params.append('--conservative')
  270. # No extra params needed
  271. # Absolute file path:
  272. fn = self.tempdir + filename
  273. call_args = [sys.executable, script] + params + ['-w', fn]
  274. try:
  275. output = check_output(call_args, stderr=STDOUT, env=self.env)
  276. except CalledProcessError as e:
  277. with open(fn) as f:
  278. msg = (
  279. 'Error running the command %s\n'
  280. '%s\n'
  281. 'Contents of file %s:\n'
  282. '\n'
  283. '%s') % (
  284. ' '.join(call_args),
  285. 'env=%s' % self.env,
  286. fn,
  287. '----\n%s\n----' % f.read(),
  288. )
  289. ErrorClass = (FuturizeError if 'futurize' in script else PasteurizeError)
  290. if not hasattr(e, 'output'):
  291. # The attribute CalledProcessError.output doesn't exist on Py2.6
  292. e.output = None
  293. raise ErrorClass(msg, e.returncode, e.cmd, output=e.output)
  294. return output
  295. def _run_test_script(self, filename='mytestscript.py',
  296. interpreter=sys.executable):
  297. # Absolute file path:
  298. fn = self.tempdir + filename
  299. try:
  300. output = check_output([interpreter, fn],
  301. env=self.env, stderr=STDOUT)
  302. except CalledProcessError as e:
  303. with open(fn) as f:
  304. msg = (
  305. 'Error running the command %s\n'
  306. '%s\n'
  307. 'Contents of file %s:\n'
  308. '\n'
  309. '%s') % (
  310. ' '.join([interpreter, fn]),
  311. 'env=%s' % self.env,
  312. fn,
  313. '----\n%s\n----' % f.read(),
  314. )
  315. if not hasattr(e, 'output'):
  316. # The attribute CalledProcessError.output doesn't exist on Py2.6
  317. e.output = None
  318. raise VerboseCalledProcessError(msg, e.returncode, e.cmd, output=e.output)
  319. return output
  320. # Decorator to skip some tests on Python 2.6 ...
  321. skip26 = unittest.skipIf(PY26, "this test is known to fail on Py2.6")
  322. def expectedFailurePY3(func):
  323. if not PY3:
  324. return func
  325. return unittest.expectedFailure(func)
  326. def expectedFailurePY26(func):
  327. if not PY26:
  328. return func
  329. return unittest.expectedFailure(func)
  330. def expectedFailurePY27(func):
  331. if not PY27:
  332. return func
  333. return unittest.expectedFailure(func)
  334. def expectedFailurePY2(func):
  335. if not PY2:
  336. return func
  337. return unittest.expectedFailure(func)
  338. # Renamed in Py3.3:
  339. if not hasattr(unittest.TestCase, 'assertRaisesRegex'):
  340. unittest.TestCase.assertRaisesRegex = unittest.TestCase.assertRaisesRegexp
  341. # From Py3.3:
  342. def assertRegex(self, text, expected_regex, msg=None):
  343. """Fail the test unless the text matches the regular expression."""
  344. if isinstance(expected_regex, (str, unicode)):
  345. assert expected_regex, "expected_regex must not be empty."
  346. expected_regex = re.compile(expected_regex)
  347. if not expected_regex.search(text):
  348. msg = msg or "Regex didn't match"
  349. msg = '%s: %r not found in %r' % (msg, expected_regex.pattern, text)
  350. raise self.failureException(msg)
  351. if not hasattr(unittest.TestCase, 'assertRegex'):
  352. bind_method(unittest.TestCase, 'assertRegex', assertRegex)
  353. class _AssertRaisesBaseContext(object):
  354. def __init__(self, expected, test_case, callable_obj=None,
  355. expected_regex=None):
  356. self.expected = expected
  357. self.test_case = test_case
  358. if callable_obj is not None:
  359. try:
  360. self.obj_name = callable_obj.__name__
  361. except AttributeError:
  362. self.obj_name = str(callable_obj)
  363. else:
  364. self.obj_name = None
  365. if isinstance(expected_regex, (bytes, str)):
  366. expected_regex = re.compile(expected_regex)
  367. self.expected_regex = expected_regex
  368. self.msg = None
  369. def _raiseFailure(self, standardMsg):
  370. msg = self.test_case._formatMessage(self.msg, standardMsg)
  371. raise self.test_case.failureException(msg)
  372. def handle(self, name, callable_obj, args, kwargs):
  373. """
  374. If callable_obj is None, assertRaises/Warns is being used as a
  375. context manager, so check for a 'msg' kwarg and return self.
  376. If callable_obj is not None, call it passing args and kwargs.
  377. """
  378. if callable_obj is None:
  379. self.msg = kwargs.pop('msg', None)
  380. return self
  381. with self:
  382. callable_obj(*args, **kwargs)
  383. class _AssertWarnsContext(_AssertRaisesBaseContext):
  384. """A context manager used to implement TestCase.assertWarns* methods."""
  385. def __enter__(self):
  386. # The __warningregistry__'s need to be in a pristine state for tests
  387. # to work properly.
  388. for v in sys.modules.values():
  389. if getattr(v, '__warningregistry__', None):
  390. v.__warningregistry__ = {}
  391. self.warnings_manager = warnings.catch_warnings(record=True)
  392. self.warnings = self.warnings_manager.__enter__()
  393. warnings.simplefilter("always", self.expected)
  394. return self
  395. def __exit__(self, exc_type, exc_value, tb):
  396. self.warnings_manager.__exit__(exc_type, exc_value, tb)
  397. if exc_type is not None:
  398. # let unexpected exceptions pass through
  399. return
  400. try:
  401. exc_name = self.expected.__name__
  402. except AttributeError:
  403. exc_name = str(self.expected)
  404. first_matching = None
  405. for m in self.warnings:
  406. w = m.message
  407. if not isinstance(w, self.expected):
  408. continue
  409. if first_matching is None:
  410. first_matching = w
  411. if (self.expected_regex is not None and
  412. not self.expected_regex.search(str(w))):
  413. continue
  414. # store warning for later retrieval
  415. self.warning = w
  416. self.filename = m.filename
  417. self.lineno = m.lineno
  418. return
  419. # Now we simply try to choose a helpful failure message
  420. if first_matching is not None:
  421. self._raiseFailure('"{}" does not match "{}"'.format(
  422. self.expected_regex.pattern, str(first_matching)))
  423. if self.obj_name:
  424. self._raiseFailure("{} not triggered by {}".format(exc_name,
  425. self.obj_name))
  426. else:
  427. self._raiseFailure("{} not triggered".format(exc_name))
  428. def assertWarns(self, expected_warning, callable_obj=None, *args, **kwargs):
  429. """Fail unless a warning of class warnClass is triggered
  430. by callable_obj when invoked with arguments args and keyword
  431. arguments kwargs. If a different type of warning is
  432. triggered, it will not be handled: depending on the other
  433. warning filtering rules in effect, it might be silenced, printed
  434. out, or raised as an exception.
  435. If called with callable_obj omitted or None, will return a
  436. context object used like this::
  437. with self.assertWarns(SomeWarning):
  438. do_something()
  439. An optional keyword argument 'msg' can be provided when assertWarns
  440. is used as a context object.
  441. The context manager keeps a reference to the first matching
  442. warning as the 'warning' attribute; similarly, the 'filename'
  443. and 'lineno' attributes give you information about the line
  444. of Python code from which the warning was triggered.
  445. This allows you to inspect the warning after the assertion::
  446. with self.assertWarns(SomeWarning) as cm:
  447. do_something()
  448. the_warning = cm.warning
  449. self.assertEqual(the_warning.some_attribute, 147)
  450. """
  451. context = _AssertWarnsContext(expected_warning, self, callable_obj)
  452. return context.handle('assertWarns', callable_obj, args, kwargs)
  453. if not hasattr(unittest.TestCase, 'assertWarns'):
  454. bind_method(unittest.TestCase, 'assertWarns', assertWarns)