gooderp18绿色标准版
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

324 lines
11KB

  1. """Test result object"""
  2. import logging
  3. import collections
  4. import contextlib
  5. import inspect
  6. import re
  7. import time
  8. import traceback
  9. from typing import NamedTuple
  10. from . import case
  11. from .. import sql_db
  12. __unittest = True
  13. STDOUT_LINE = '\nStdout:\n%s'
  14. STDERR_LINE = '\nStderr:\n%s'
  15. stats_logger = logging.getLogger('odoo.tests.stats')
  16. class Stat(NamedTuple):
  17. time: float = 0.0
  18. queries: int = 0
  19. def __add__(self, other: 'Stat') -> 'Stat':
  20. if other == 0:
  21. return self
  22. if not isinstance(other, Stat):
  23. return NotImplemented
  24. return Stat(
  25. self.time + other.time,
  26. self.queries + other.queries,
  27. )
  28. _logger = logging.getLogger(__name__)
  29. _TEST_ID = re.compile(r"""
  30. ^
  31. odoo\.addons\.
  32. (?P<module>[^.]+)
  33. \.tests\.
  34. (?P<class>.+)
  35. \.
  36. (?P<method>[^.]+)
  37. $
  38. """, re.VERBOSE)
  39. class OdooTestResult(object):
  40. """
  41. This class in inspired from TextTestResult and modifies TestResult
  42. Instead of using a stream, we are using the logger.
  43. unittest.TestResult: Holder for test result information.
  44. Test results are automatically managed by the TestCase and TestSuite
  45. classes, and do not need to be explicitly manipulated by writers of tests.
  46. This version does not hold a list of failure but just a count since the failure is logged immediately
  47. This version is also simplied to better match our use cases
  48. """
  49. _previousTestClass = None
  50. _moduleSetUpFailed = False
  51. def __init__(self, stream=None, descriptions=None, verbosity=None):
  52. self.failures_count = 0
  53. self.errors_count = 0
  54. self.testsRun = 0
  55. self.skipped = 0
  56. self.tb_locals = False
  57. # custom
  58. self.time_start = None
  59. self.queries_start = None
  60. self._soft_fail = False
  61. self.had_failure = False
  62. self.stats = collections.defaultdict(Stat)
  63. def printErrors(self):
  64. "Called by TestRunner after test run"
  65. def startTest(self, test):
  66. "Called when the given test is about to be run"
  67. self.testsRun += 1
  68. self.log(logging.INFO, 'Starting %s ...', self.getDescription(test), test=test)
  69. self.time_start = time.time()
  70. self.queries_start = sql_db.sql_counter
  71. def stopTest(self, test):
  72. """Called when the given test has been run"""
  73. if stats_logger.isEnabledFor(logging.INFO):
  74. self.stats[test.id()] = Stat(
  75. time=time.time() - self.time_start,
  76. queries=sql_db.sql_counter - self.queries_start,
  77. )
  78. def addError(self, test, err):
  79. """Called when an error has occurred. 'err' is a tuple of values as
  80. returned by sys.exc_info().
  81. """
  82. if self._soft_fail:
  83. self.had_failure = True
  84. else:
  85. self.errors_count += 1
  86. self.logError("ERROR", test, err)
  87. def addFailure(self, test, err):
  88. """Called when an error has occurred. 'err' is a tuple of values as
  89. returned by sys.exc_info()."""
  90. if self._soft_fail:
  91. self.had_failure = True
  92. else:
  93. self.failures_count += 1
  94. self.logError("FAIL", test, err)
  95. def addSubTest(self, test, subtest, err):
  96. if err is not None:
  97. if issubclass(err[0], test.failureException):
  98. self.addFailure(subtest, err)
  99. else:
  100. self.addError(subtest, err)
  101. def addSuccess(self, test):
  102. "Called when a test has completed successfully"
  103. def addSkip(self, test, reason):
  104. """Called when a test is skipped."""
  105. self.skipped += 1
  106. self.log(logging.INFO, 'skipped %s : %s', self.getDescription(test), reason, test=test)
  107. def wasSuccessful(self):
  108. """Tells whether or not this result was a success."""
  109. # The hasattr check is for test_result's OldResult test. That
  110. # way this method works on objects that lack the attribute.
  111. # (where would such result intances come from? old stored pickles?)
  112. return self.failures_count == self.errors_count == 0
  113. def _exc_info_to_string(self, err, test):
  114. """Converts a sys.exc_info()-style tuple of values into a string."""
  115. exctype, value, tb = err
  116. # Skip test runner traceback levels
  117. while tb and self._is_relevant_tb_level(tb):
  118. tb = tb.tb_next
  119. if exctype is test.failureException:
  120. # Skip assert*() traceback levels
  121. length = self._count_relevant_tb_levels(tb)
  122. else:
  123. length = None
  124. tb_e = traceback.TracebackException(
  125. exctype, value, tb, limit=length, capture_locals=self.tb_locals)
  126. msgLines = list(tb_e.format())
  127. return ''.join(msgLines)
  128. def _is_relevant_tb_level(self, tb):
  129. return '__unittest' in tb.tb_frame.f_globals
  130. def _count_relevant_tb_levels(self, tb):
  131. length = 0
  132. while tb and not self._is_relevant_tb_level(tb):
  133. length += 1
  134. tb = tb.tb_next
  135. return length
  136. def __repr__(self):
  137. return f"<{self.__class__.__module__}.{self.__class__.__qualname__} run={self.testsRun} errors={self.errors_count} failures={self.failures_count}>"
  138. def __str__(self):
  139. return f'{self.failures_count} failed, {self.errors_count} error(s) of {self.testsRun} tests'
  140. @contextlib.contextmanager
  141. def soft_fail(self):
  142. self.had_failure = False
  143. self._soft_fail = True
  144. try:
  145. yield
  146. finally:
  147. self._soft_fail = False
  148. self.had_failure = False
  149. def update(self, other):
  150. """ Merges an other test result into this one, only updates contents
  151. :type other: OdooTestResult
  152. """
  153. self.failures_count += other.failures_count
  154. self.errors_count += other.errors_count
  155. self.testsRun += other.testsRun
  156. self.skipped += other.skipped
  157. self.stats.update(other.stats)
  158. def log(self, level, msg, *args, test=None, exc_info=None, extra=None, stack_info=False, caller_infos=None):
  159. """
  160. ``test`` is the running test case, ``caller_infos`` is
  161. (fn, lno, func, sinfo) (logger.findCaller format), see logger.log for
  162. the other parameters.
  163. """
  164. test = test or self
  165. while isinstance(test, case._SubTest) and test.test_case:
  166. test = test.test_case
  167. logger = logging.getLogger(test.__module__)
  168. try:
  169. caller_infos = caller_infos or logger.findCaller(stack_info)
  170. except ValueError:
  171. caller_infos = "(unknown file)", 0, "(unknown function)", None
  172. (fn, lno, func, sinfo) = caller_infos
  173. # using logger.log makes it difficult to spot-replace findCaller in
  174. # order to provide useful location information (the problematic spot
  175. # inside the test function), so use lower-level functions instead
  176. if logger.isEnabledFor(level):
  177. record = logger.makeRecord(logger.name, level, fn, lno, msg, args, exc_info, func, extra, sinfo)
  178. logger.handle(record)
  179. def log_stats(self):
  180. if not stats_logger.isEnabledFor(logging.INFO):
  181. return
  182. details = stats_logger.isEnabledFor(logging.DEBUG)
  183. stats_tree = collections.defaultdict(Stat)
  184. counts = collections.Counter()
  185. for test, stat in self.stats.items():
  186. r = _TEST_ID.match(test)
  187. if not r: # upgrade has tests at weird paths, ignore them
  188. continue
  189. stats_tree[r['module']] += stat
  190. counts[r['module']] += 1
  191. if details:
  192. stats_tree['%(module)s.%(class)s' % r] += stat
  193. stats_tree['%(module)s.%(class)s.%(method)s' % r] += stat
  194. if details:
  195. stats_logger.debug('Detailed Tests Report:\n%s', ''.join(
  196. f'\t{test}: {stats.time:.2f}s {stats.queries} queries\n'
  197. for test, stats in sorted(stats_tree.items())
  198. ))
  199. else:
  200. for module, stat in sorted(stats_tree.items()):
  201. stats_logger.info(
  202. "%s: %d tests %.2fs %d queries",
  203. module, counts[module],
  204. stat.time, stat.queries
  205. )
  206. def getDescription(self, test):
  207. if isinstance(test, case._SubTest):
  208. return 'Subtest %s.%s %s' % (test.test_case.__class__.__qualname__, test.test_case._testMethodName, test._subDescription())
  209. if isinstance(test, case.TestCase):
  210. # since we have the module name in the logger, this will avoid to duplicate module info in log line
  211. # we only apply this for TestCase since we can receive error handler or other special case
  212. return "%s.%s" % (test.__class__.__qualname__, test._testMethodName)
  213. return str(test)
  214. @contextlib.contextmanager
  215. def collectStats(self, test_id):
  216. queries_before = sql_db.sql_counter
  217. time_start = time.time()
  218. yield
  219. self.stats[test_id] += Stat(
  220. time=time.time() - time_start,
  221. queries=sql_db.sql_counter - queries_before,
  222. )
  223. def logError(self, flavour, test, error):
  224. err = self._exc_info_to_string(error, test)
  225. caller_infos = self.getErrorCallerInfo(error, test)
  226. self.log(logging.INFO, '=' * 70, test=test, caller_infos=caller_infos) # keep this as info !!!!!!
  227. self.log(logging.ERROR, "%s: %s\n%s", flavour, self.getDescription(test), err, test=test, caller_infos=caller_infos)
  228. def getErrorCallerInfo(self, error, test):
  229. """
  230. :param error: A tuple (exctype, value, tb) as returned by sys.exc_info().
  231. :param test: A TestCase that created this error.
  232. :returns: a tuple (fn, lno, func, sinfo) matching the logger findCaller format or None
  233. """
  234. # only handle TestCase here. test can be an _ErrorHolder in some case (setup/teardown class errors)
  235. if not isinstance(test, case.TestCase):
  236. return
  237. _, _, error_traceback = error
  238. # move upwards the subtest hierarchy to find the real test
  239. while isinstance(test, case._SubTest) and test.test_case:
  240. test = test.test_case
  241. method_tb = None
  242. file_tb = None
  243. filename = inspect.getfile(type(test))
  244. # Note: since _ErrorCatcher was introduced, we could always take the
  245. # last frame, keeping the check on the test method for safety.
  246. # Fallbacking on file for cleanup file shoud always be correct to a
  247. # minimal working version would be
  248. #
  249. # infos_tb = error_traceback
  250. # while infos_tb.tb_next()
  251. # infos_tb = infos_tb.tb_next()
  252. #
  253. while error_traceback:
  254. code = error_traceback.tb_frame.f_code
  255. if code.co_name in (test._testMethodName, 'setUp', 'tearDown'):
  256. method_tb = error_traceback
  257. if code.co_filename == filename:
  258. file_tb = error_traceback
  259. error_traceback = error_traceback.tb_next
  260. infos_tb = method_tb or file_tb
  261. if infos_tb:
  262. code = infos_tb.tb_frame.f_code
  263. lineno = infos_tb.tb_lineno
  264. filename = code.co_filename
  265. method = test._testMethodName
  266. return (filename, lineno, method, None)
上海开阖软件有限公司 沪ICP备12045867号-1