123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539 |
- from __future__ import print_function, absolute_import
- import os
- import tempfile
- import unittest
- import sys
- import re
- import warnings
- import io
- from textwrap import dedent
- from future.utils import bind_method, PY26, PY3, PY2, PY27
- from future.moves.subprocess import check_output, STDOUT, CalledProcessError
- if PY26:
- import unittest2 as unittest
- def reformat_code(code):
- """
- Removes any leading \n and dedents.
- """
- if code.startswith('\n'):
- code = code[1:]
- return dedent(code)
- def order_future_lines(code):
- """
- Returns the code block with any ``__future__`` import lines sorted, and
- then any ``future`` import lines sorted, then any ``builtins`` import lines
- sorted.
- This only sorts the lines within the expected blocks.
- See test_order_future_lines() for an example.
- """
- # We need .splitlines(keepends=True), which doesn't exist on Py2,
- # so we use this instead:
- lines = code.split('\n')
- uufuture_line_numbers = [i for i, line in enumerate(lines)
- if line.startswith('from __future__ import ')]
- future_line_numbers = [i for i, line in enumerate(lines)
- if line.startswith('from future')
- or line.startswith('from past')]
- builtins_line_numbers = [i for i, line in enumerate(lines)
- if line.startswith('from builtins')]
- assert code.lstrip() == code, ('internal usage error: '
- 'dedent the code before calling order_future_lines()')
- def mymax(numbers):
- return max(numbers) if len(numbers) > 0 else 0
- def mymin(numbers):
- return min(numbers) if len(numbers) > 0 else float('inf')
- assert mymax(uufuture_line_numbers) <= mymin(future_line_numbers), \
- 'the __future__ and future imports are out of order'
- # assert mymax(future_line_numbers) <= mymin(builtins_line_numbers), \
- # 'the future and builtins imports are out of order'
- uul = sorted([lines[i] for i in uufuture_line_numbers])
- sorted_uufuture_lines = dict(zip(uufuture_line_numbers, uul))
- fl = sorted([lines[i] for i in future_line_numbers])
- sorted_future_lines = dict(zip(future_line_numbers, fl))
- bl = sorted([lines[i] for i in builtins_line_numbers])
- sorted_builtins_lines = dict(zip(builtins_line_numbers, bl))
- # Replace the old unsorted "from __future__ import ..." lines with the
- # new sorted ones:
- new_lines = []
- for i in range(len(lines)):
- if i in uufuture_line_numbers:
- new_lines.append(sorted_uufuture_lines[i])
- elif i in future_line_numbers:
- new_lines.append(sorted_future_lines[i])
- elif i in builtins_line_numbers:
- new_lines.append(sorted_builtins_lines[i])
- else:
- new_lines.append(lines[i])
- return '\n'.join(new_lines)
- class VerboseCalledProcessError(CalledProcessError):
- """
- Like CalledProcessError, but it displays more information (message and
- script output) for diagnosing test failures etc.
- """
- def __init__(self, msg, returncode, cmd, output=None):
- self.msg = msg
- self.returncode = returncode
- self.cmd = cmd
- self.output = output
- def __str__(self):
- return ("Command '%s' failed with exit status %d\nMessage: %s\nOutput: %s"
- % (self.cmd, self.returncode, self.msg, self.output))
- class FuturizeError(VerboseCalledProcessError):
- pass
- class PasteurizeError(VerboseCalledProcessError):
- pass
- class CodeHandler(unittest.TestCase):
- """
- Handy mixin for test classes for writing / reading / futurizing /
- running .py files in the test suite.
- """
- def setUp(self):
- """
- The outputs from the various futurize stages should have the
- following headers:
- """
- # After stage1:
- # TODO: use this form after implementing a fixer to consolidate
- # __future__ imports into a single line:
- # self.headers1 = """
- # from __future__ import absolute_import, division, print_function
- # """
- self.headers1 = reformat_code("""
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- """)
- # After stage2 --all-imports:
- # TODO: use this form after implementing a fixer to consolidate
- # __future__ imports into a single line:
- # self.headers2 = """
- # from __future__ import (absolute_import, division,
- # print_function, unicode_literals)
- # from future import standard_library
- # from future.builtins import *
- # """
- self.headers2 = reformat_code("""
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- from __future__ import unicode_literals
- from future import standard_library
- standard_library.install_aliases()
- from builtins import *
- """)
- self.interpreters = [sys.executable]
- self.tempdir = tempfile.mkdtemp() + os.path.sep
- pypath = os.getenv('PYTHONPATH')
- if pypath:
- self.env = {'PYTHONPATH': os.getcwd() + os.pathsep + pypath}
- else:
- self.env = {'PYTHONPATH': os.getcwd()}
- def convert(self, code, stages=(1, 2), all_imports=False, from3=False,
- reformat=True, run=True, conservative=False):
- """
- Converts the code block using ``futurize`` and returns the
- resulting code.
- Passing stages=[1] or stages=[2] passes the flag ``--stage1`` or
- ``stage2`` to ``futurize``. Passing both stages runs ``futurize``
- with both stages by default.
- If from3 is False, runs ``futurize``, converting from Python 2 to
- both 2 and 3. If from3 is True, runs ``pasteurize`` to convert
- from Python 3 to both 2 and 3.
- Optionally reformats the code block first using the reformat() function.
- If run is True, runs the resulting code under all Python
- interpreters in self.interpreters.
- """
- if reformat:
- code = reformat_code(code)
- self._write_test_script(code)
- self._futurize_test_script(stages=stages, all_imports=all_imports,
- from3=from3, conservative=conservative)
- output = self._read_test_script()
- if run:
- for interpreter in self.interpreters:
- _ = self._run_test_script(interpreter=interpreter)
- return output
- def compare(self, output, expected, ignore_imports=True):
- """
- Compares whether the code blocks are equal. If not, raises an
- exception so the test fails. Ignores any trailing whitespace like
- blank lines.
- If ignore_imports is True, passes the code blocks into the
- strip_future_imports method.
- If one code block is a unicode string and the other a
- byte-string, it assumes the byte-string is encoded as utf-8.
- """
- if ignore_imports:
- output = self.strip_future_imports(output)
- expected = self.strip_future_imports(expected)
- if isinstance(output, bytes) and not isinstance(expected, bytes):
- output = output.decode('utf-8')
- if isinstance(expected, bytes) and not isinstance(output, bytes):
- expected = expected.decode('utf-8')
- self.assertEqual(order_future_lines(output.rstrip()),
- expected.rstrip())
- def strip_future_imports(self, code):
- """
- Strips any of these import lines:
- from __future__ import <anything>
- from future <anything>
- from future.<anything>
- from builtins <anything>
- or any line containing:
- install_hooks()
- or:
- install_aliases()
- Limitation: doesn't handle imports split across multiple lines like
- this:
- from __future__ import (absolute_import, division, print_function,
- unicode_literals)
- """
- output = []
- # We need .splitlines(keepends=True), which doesn't exist on Py2,
- # so we use this instead:
- for line in code.split('\n'):
- if not (line.startswith('from __future__ import ')
- or line.startswith('from future ')
- or line.startswith('from builtins ')
- or 'install_hooks()' in line
- or 'install_aliases()' in line
- # but don't match "from future_builtins" :)
- or line.startswith('from future.')):
- output.append(line)
- return '\n'.join(output)
- def convert_check(self, before, expected, stages=(1, 2), all_imports=False,
- ignore_imports=True, from3=False, run=True,
- conservative=False):
- """
- Convenience method that calls convert() and compare().
- Reformats the code blocks automatically using the reformat_code()
- function.
- If all_imports is passed, we add the appropriate import headers
- for the stage(s) selected to the ``expected`` code-block, so they
- needn't appear repeatedly in the test code.
- If ignore_imports is True, ignores the presence of any lines
- beginning:
- from __future__ import ...
- from future import ...
- for the purpose of the comparison.
- """
- output = self.convert(before, stages=stages, all_imports=all_imports,
- from3=from3, run=run, conservative=conservative)
- if all_imports:
- headers = self.headers2 if 2 in stages else self.headers1
- else:
- headers = ''
- reformatted = reformat_code(expected)
- if headers in reformatted:
- headers = ''
- self.compare(output, headers + reformatted,
- ignore_imports=ignore_imports)
- def unchanged(self, code, **kwargs):
- """
- Convenience method to ensure the code is unchanged by the
- futurize process.
- """
- self.convert_check(code, code, **kwargs)
- def _write_test_script(self, code, filename='mytestscript.py'):
- """
- Dedents the given code (a multiline string) and writes it out to
- a file in a temporary folder like /tmp/tmpUDCn7x/mytestscript.py.
- """
- if isinstance(code, bytes):
- code = code.decode('utf-8')
- # Be explicit about encoding the temp file as UTF-8 (issue #63):
- with io.open(self.tempdir + filename, 'wt', encoding='utf-8') as f:
- f.write(dedent(code))
- def _read_test_script(self, filename='mytestscript.py'):
- with io.open(self.tempdir + filename, 'rt', encoding='utf-8') as f:
- newsource = f.read()
- return newsource
- def _futurize_test_script(self, filename='mytestscript.py', stages=(1, 2),
- all_imports=False, from3=False,
- conservative=False):
- params = []
- stages = list(stages)
- if all_imports:
- params.append('--all-imports')
- if from3:
- script = 'pasteurize.py'
- else:
- script = 'futurize.py'
- if stages == [1]:
- params.append('--stage1')
- elif stages == [2]:
- params.append('--stage2')
- else:
- assert stages == [1, 2]
- if conservative:
- params.append('--conservative')
- # No extra params needed
- # Absolute file path:
- fn = self.tempdir + filename
- call_args = [sys.executable, script] + params + ['-w', fn]
- try:
- output = check_output(call_args, stderr=STDOUT, env=self.env)
- except CalledProcessError as e:
- with open(fn) as f:
- msg = (
- 'Error running the command %s\n'
- '%s\n'
- 'Contents of file %s:\n'
- '\n'
- '%s') % (
- ' '.join(call_args),
- 'env=%s' % self.env,
- fn,
- '----\n%s\n----' % f.read(),
- )
- ErrorClass = (FuturizeError if 'futurize' in script else PasteurizeError)
- if not hasattr(e, 'output'):
- # The attribute CalledProcessError.output doesn't exist on Py2.6
- e.output = None
- raise ErrorClass(msg, e.returncode, e.cmd, output=e.output)
- return output
- def _run_test_script(self, filename='mytestscript.py',
- interpreter=sys.executable):
- # Absolute file path:
- fn = self.tempdir + filename
- try:
- output = check_output([interpreter, fn],
- env=self.env, stderr=STDOUT)
- except CalledProcessError as e:
- with open(fn) as f:
- msg = (
- 'Error running the command %s\n'
- '%s\n'
- 'Contents of file %s:\n'
- '\n'
- '%s') % (
- ' '.join([interpreter, fn]),
- 'env=%s' % self.env,
- fn,
- '----\n%s\n----' % f.read(),
- )
- if not hasattr(e, 'output'):
- # The attribute CalledProcessError.output doesn't exist on Py2.6
- e.output = None
- raise VerboseCalledProcessError(msg, e.returncode, e.cmd, output=e.output)
- return output
- # Decorator to skip some tests on Python 2.6 ...
- skip26 = unittest.skipIf(PY26, "this test is known to fail on Py2.6")
- def expectedFailurePY3(func):
- if not PY3:
- return func
- return unittest.expectedFailure(func)
- def expectedFailurePY26(func):
- if not PY26:
- return func
- return unittest.expectedFailure(func)
- def expectedFailurePY27(func):
- if not PY27:
- return func
- return unittest.expectedFailure(func)
- def expectedFailurePY2(func):
- if not PY2:
- return func
- return unittest.expectedFailure(func)
- # Renamed in Py3.3:
- if not hasattr(unittest.TestCase, 'assertRaisesRegex'):
- unittest.TestCase.assertRaisesRegex = unittest.TestCase.assertRaisesRegexp
- # From Py3.3:
- def assertRegex(self, text, expected_regex, msg=None):
- """Fail the test unless the text matches the regular expression."""
- if isinstance(expected_regex, (str, unicode)):
- assert expected_regex, "expected_regex must not be empty."
- expected_regex = re.compile(expected_regex)
- if not expected_regex.search(text):
- msg = msg or "Regex didn't match"
- msg = '%s: %r not found in %r' % (msg, expected_regex.pattern, text)
- raise self.failureException(msg)
- if not hasattr(unittest.TestCase, 'assertRegex'):
- bind_method(unittest.TestCase, 'assertRegex', assertRegex)
- class _AssertRaisesBaseContext(object):
- def __init__(self, expected, test_case, callable_obj=None,
- expected_regex=None):
- self.expected = expected
- self.test_case = test_case
- if callable_obj is not None:
- try:
- self.obj_name = callable_obj.__name__
- except AttributeError:
- self.obj_name = str(callable_obj)
- else:
- self.obj_name = None
- if isinstance(expected_regex, (bytes, str)):
- expected_regex = re.compile(expected_regex)
- self.expected_regex = expected_regex
- self.msg = None
- def _raiseFailure(self, standardMsg):
- msg = self.test_case._formatMessage(self.msg, standardMsg)
- raise self.test_case.failureException(msg)
- def handle(self, name, callable_obj, args, kwargs):
- """
- If callable_obj is None, assertRaises/Warns is being used as a
- context manager, so check for a 'msg' kwarg and return self.
- If callable_obj is not None, call it passing args and kwargs.
- """
- if callable_obj is None:
- self.msg = kwargs.pop('msg', None)
- return self
- with self:
- callable_obj(*args, **kwargs)
- class _AssertWarnsContext(_AssertRaisesBaseContext):
- """A context manager used to implement TestCase.assertWarns* methods."""
- def __enter__(self):
- # The __warningregistry__'s need to be in a pristine state for tests
- # to work properly.
- for v in sys.modules.values():
- if getattr(v, '__warningregistry__', None):
- v.__warningregistry__ = {}
- self.warnings_manager = warnings.catch_warnings(record=True)
- self.warnings = self.warnings_manager.__enter__()
- warnings.simplefilter("always", self.expected)
- return self
- def __exit__(self, exc_type, exc_value, tb):
- self.warnings_manager.__exit__(exc_type, exc_value, tb)
- if exc_type is not None:
- # let unexpected exceptions pass through
- return
- try:
- exc_name = self.expected.__name__
- except AttributeError:
- exc_name = str(self.expected)
- first_matching = None
- for m in self.warnings:
- w = m.message
- if not isinstance(w, self.expected):
- continue
- if first_matching is None:
- first_matching = w
- if (self.expected_regex is not None and
- not self.expected_regex.search(str(w))):
- continue
- # store warning for later retrieval
- self.warning = w
- self.filename = m.filename
- self.lineno = m.lineno
- return
- # Now we simply try to choose a helpful failure message
- if first_matching is not None:
- self._raiseFailure('"{}" does not match "{}"'.format(
- self.expected_regex.pattern, str(first_matching)))
- if self.obj_name:
- self._raiseFailure("{} not triggered by {}".format(exc_name,
- self.obj_name))
- else:
- self._raiseFailure("{} not triggered".format(exc_name))
- def assertWarns(self, expected_warning, callable_obj=None, *args, **kwargs):
- """Fail unless a warning of class warnClass is triggered
- by callable_obj when invoked with arguments args and keyword
- arguments kwargs. If a different type of warning is
- triggered, it will not be handled: depending on the other
- warning filtering rules in effect, it might be silenced, printed
- out, or raised as an exception.
- If called with callable_obj omitted or None, will return a
- context object used like this::
- with self.assertWarns(SomeWarning):
- do_something()
- An optional keyword argument 'msg' can be provided when assertWarns
- is used as a context object.
- The context manager keeps a reference to the first matching
- warning as the 'warning' attribute; similarly, the 'filename'
- and 'lineno' attributes give you information about the line
- of Python code from which the warning was triggered.
- This allows you to inspect the warning after the assertion::
- with self.assertWarns(SomeWarning) as cm:
- do_something()
- the_warning = cm.warning
- self.assertEqual(the_warning.some_attribute, 147)
- """
- context = _AssertWarnsContext(expected_warning, self, callable_obj)
- return context.handle('assertWarns', callable_obj, args, kwargs)
- if not hasattr(unittest.TestCase, 'assertWarns'):
- bind_method(unittest.TestCase, 'assertWarns', assertWarns)
|