Source code for lsst.utils.tests

#
# LSST Data Management System
#
# Copyright 2008-2017  AURA/LSST.
#
# This product includes software developed by the
# LSST Project (http://www.lsst.org/).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the LSST License Statement and
# the GNU General Public License along with this program.  If not,
# see <https://www.lsstcorp.org/LegalNotices/>.
#
"""Support code for running unit tests"""
from __future__ import print_function
from __future__ import division
from builtins import zip
from builtins import range

import contextlib
import gc
import inspect
import os
import subprocess
import sys
import unittest
import warnings
import numpy
import functools
import tempfile

__all__ = ["init", "run", "MemoryTestCase", "ExecutablesTestCase", "getTempFilePath",
           "TestCase", "assertFloatsAlmostEqual", "assertFloatsNotEqual", "assertFloatsEqual"]

# File descriptor leak test will be skipped if psutil can not be imported
try:
    import psutil
except ImportError:
    psutil = None

try:
    import lsst.daf.base as dafBase
except ImportError:
    dafBase = None

try:
    type(memId0)
except NameError:
    memId0 = 0                          # ignore leaked blocks with IDs before memId0
    nleakPrintMax = 20                  # maximum number of leaked blocks to print

# Initialize the list of open files to an empty set
open_files = set()


def _get_open_files():
    """Return a set containing the list of open files."""
    if psutil is None:
        return set()
    return set(p.path for p in psutil.Process().open_files())


[docs]def init(): """Initialize the memory tester""" global memId0 global open_files if dafBase: memId0 = dafBase.Citizen.getNextMemId() # used by MemoryTestCase # Reset the list of open files open_files = _get_open_files()
[docs]def run(suite, exit=True): """Exit with the status code resulting from running the provided test suite""" if unittest.TextTestRunner().run(suite).wasSuccessful(): status = 0 else: status = 1 if exit: sys.exit(status) else: return status
def sort_tests(tests): """Go through the supplied sequence of test suites and sort them to ensure that MemoryTestCases are at the end of the test list. Returns a combined TestSuite.""" suite = unittest.TestSuite() memtests = [] for test_suite in tests: try: # Just test the first test method in the suite for MemoryTestCase # Use loop rather than next as it is possible for a test class # to not have any test methods and the Python community prefers # for loops over catching a StopIteration exception. bases = None for method in test_suite: bases = inspect.getmro(method.__class__) break if bases is not None and MemoryTestCase in bases: memtests.append(test_suite) else: suite.addTests(test_suite) except TypeError: if isinstance(test_suite, MemoryTestCase): memtests.append(test_suite) else: suite.addTest(test_suite) suite.addTests(memtests) return suite def suiteClassWrapper(tests): return unittest.TestSuite(sort_tests(tests)) # Replace the suiteClass callable in the defaultTestLoader # so that we can reorder the test ordering. This will have # no effect if no memory test cases are found. unittest.defaultTestLoader.suiteClass = suiteClassWrapper
[docs]class MemoryTestCase(unittest.TestCase): """Check for memory leaks since memId0 was allocated"""
[docs] def setUp(self): pass
@classmethod
[docs] def tearDownClass(cls): """Reset the leak counter when the tests have been completed""" init()
[docs] def testLeaks(self): """Check for memory leaks in the preceding tests""" if dafBase: gc.collect() global memId0, nleakPrintMax nleak = dafBase.Citizen.census(0, memId0) if nleak != 0: plural = "s" if nleak != 1 else "" print("\n%d Object%s leaked:" % (nleak, plural)) if nleak <= nleakPrintMax: print(dafBase.Citizen.census(memId0)) else: census = dafBase.Citizen.census() print("...") for i in range(nleakPrintMax - 1, -1, -1): print(census[i].repr()) self.fail("Leaked %d block%s" % (nleak, plural))
[docs] def testFileDescriptorLeaks(self): if psutil is None: self.skipTest("Unable to test file descriptor leaks. psutil unavailable.") gc.collect() global open_files now_open = _get_open_files() # Some files are opened out of the control of the stack. now_open = set(f for f in now_open if not f.endswith(".car") and not f.startswith("/proc/") and not f.endswith(".ttf") and f != "/var/lib/sss/mc/passwd" and not f.endswith("astropy.log")) diff = now_open.difference(open_files) if diff: for f in diff: print("File open: %s" % f) self.fail("Failed to close %d file%s" % (len(diff), "s" if len(diff) != 1 else ""))
[docs]class ExecutablesTestCase(unittest.TestCase): """Test that executables can be run and return good status. The test methods are dynamically created. Callers must subclass this class in their own test file and invoke the create_executable_tests() class method to register the tests. """ TESTS_DISCOVERED = -1 @classmethod
[docs] def setUpClass(cls): """Abort testing if automated test creation was enabled and yet not tests were found.""" if cls.TESTS_DISCOVERED == 0: raise Exception("No executables discovered.")
[docs] def testSanity(self): """This test exists to ensure that there is at least one test to be executed. This allows the test runner to trigger the class set up machinery to test whether there are some executables to test.""" pass
[docs] def assertExecutable(self, executable, root_dir=None, args=None, msg=None): """Check an executable runs and returns good status. Prints output to standard out. On bad exit status the test fails. If the executable can not be located the test is skipped. Parameters ---------- executable : `str` Path to an executable. root_dir is not used if this is an absolute path. root_dir : `str` Directory containing exe. Ignored if None. args : list or tuple Arguments to be provided to the executable. msg : `str` Message to use when the test fails. Can be None for default message. """ if root_dir is not None and not os.path.isabs(executable): executable = os.path.join(root_dir, executable) # Form the argument list for subprocess sp_args = [executable] argstr = "no arguments" if args is not None: sp_args.extend(args) argstr = 'arguments "' + " ".join(args) + '"' print("Running executable '{}' with {}...".format(executable, argstr)) if not os.path.exists(executable): self.skipTest("Executable {} is unexpectedly missing".format(executable)) failmsg = None try: output = subprocess.check_output(sp_args) except subprocess.CalledProcessError as e: output = e.output failmsg = "Bad exit status from '{}': {}".format(executable, e.returncode) print(output.decode('utf-8')) if failmsg: if msg is None: msg = failmsg self.fail(msg)
@classmethod def _build_test_method(cls, executable, root_dir): """Build a test method and attach to class. The method is built for the supplied excutable located in the supplied root directory. cls._build_test_method(root_dir, executable) Parameters ---------- cls : `object` The class in which to create the tests. executable : `str` Name of executable. Can be absolute path. root_dir : `str` Path to executable. Not used if executable path is absolute. """ if not os.path.isabs(executable): executable = os.path.abspath(os.path.join(root_dir, executable)) # Create the test name from the executable path. test_name = "test_exe_" + executable.replace("/", "_") # This is the function that will become the test method def test_executable_runs(*args): self = args[0] self.assertExecutable(executable) # Give it a name and attach it to the class test_executable_runs.__name__ = test_name setattr(cls, test_name, test_executable_runs) @classmethod
[docs] def create_executable_tests(cls, ref_file, executables=None): """Discover executables to test and create corresponding test methods. Scans the directory containing the supplied reference file (usually __file__ supplied from the test class) to look for executables. If executables are found a test method is created for each one. That test method will run the executable and check the returned value. Executable scripts with a .py extension and shared libraries are ignored by the scanner. This class method must be called before test discovery. Example: cls.create_executable_tests(__file__) The list of executables can be overridden by passing in a sequence of explicit executables that should be tested. If an item in the sequence can not be found the test will be configured to skip rather than fail. """ # Get the search directory from the reference file ref_dir = os.path.abspath(os.path.dirname(ref_file)) if executables is None: # Look for executables to test by walking the tree executables = [] for root, dirs, files in os.walk(ref_dir): for f in files: # Skip Python files. Shared libraries are executable. if not f.endswith(".py") and not f.endswith(".so"): full_path = os.path.join(root, f) if os.access(full_path, os.X_OK): executables.append(full_path) # Store the number of tests found for later assessment. # Do not raise an exception if we have no executables as this would # cause the testing to abort before the test runner could properly # integrate it into the failure report. cls.TESTS_DISCOVERED = len(executables) # Create the test functions and attach them to the class for e in executables: cls._build_test_method(e, ref_dir)
def findFileFromRoot(ifile): """Find file which is specified as a path relative to the toplevel directory; we start in $cwd and walk up until we find the file (or throw IOError if it doesn't exist) This is useful for running tests that may be run from _dir_/tests or _dir_""" if os.path.isfile(ifile): return ifile ofile = None file = ifile while file != "": dirname, basename = os.path.split(file) if ofile: ofile = os.path.join(basename, ofile) else: ofile = basename if os.path.isfile(ofile): return ofile file = dirname raise IOError("Can't find %s" % ifile) @contextlib.contextmanager
[docs]def getTempFilePath(ext, expectOutput=True): """Return a path suitable for a temporary file and try to delete the file on success If the with block completes successfully then the file is deleted, if possible; failure results in a printed warning. If a file is remains when it should not, a RuntimeError exception is raised. This exception is also raised if a file is not present on context manager exit when one is expected to exist. If the block exits with an exception the file if left on disk so it can be examined. The file name has a random component such that nested context managers can be used with the same file suffix. Parameters ---------- ext : `str` file name extension, e.g. ".fits" expectOutput : `bool` If true, a file should be created within the context manager. If false, a file should not be present when the context manager exits. Returns ------- `str` Path for a temporary file. The path is a combination of the caller's file path and the name of the top-level function Notes ----- :: # file tests/testFoo.py import unittest import lsst.utils.tests class FooTestCase(unittest.TestCase): def testBasics(self): self.runTest() def runTest(self): with lsst.utils.tests.getTempFilePath(".fits") as tmpFile: # if tests/.tests exists then tmpFile = "tests/.tests/testFoo_testBasics.fits" # otherwise tmpFile = "testFoo_testBasics.fits" ... # at the end of this "with" block the path tmpFile will be deleted, but only if # the file exists and the "with" block terminated normally (rather than with an exception) ... """ stack = inspect.stack() # get name of first function in the file for i in range(2, len(stack)): frameInfo = inspect.getframeinfo(stack[i][0]) if i == 2: callerFilePath = frameInfo.filename callerFuncName = frameInfo.function elif callerFilePath == frameInfo.filename: # this function called the previous function callerFuncName = frameInfo.function else: break callerDir, callerFileNameWithExt = os.path.split(callerFilePath) callerFileName = os.path.splitext(callerFileNameWithExt)[0] outDir = os.path.join(callerDir, ".tests") if not os.path.isdir(outDir): outDir = "" prefix = "%s_%s-" % (callerFileName, callerFuncName) outPath = tempfile.mktemp(dir=outDir, suffix=ext, prefix=prefix) if os.path.exists(outPath): # There should not be a file there given the randomizer. Warn and remove. # Use stacklevel 3 so that the warning is reported from the end of the with block warnings.warn("Unexpectedly found pre-existing tempfile named %r" % (outPath,), stacklevel=3) try: os.remove(outPath) except OSError: pass yield outPath fileExists = os.path.exists(outPath) if expectOutput: if not fileExists: raise RuntimeError("Temp file expected named {} but none found".format(outPath)) else: if fileExists: raise RuntimeError("Unexpectedly discovered temp file named {}".format(outPath)) # Try to clean up the file regardless if fileExists: try: os.remove(outPath) except OSError as e: # Use stacklevel 3 so that the warning is reported from the end of the with block warnings.warn("Warning: could not remove file %r: %s" % (outPath, e), stacklevel=3)
[docs]class TestCase(unittest.TestCase): """Subclass of unittest.TestCase that adds some custom assertions for convenience. """
def inTestCase(func): """A decorator to add a free function to our custom TestCase class, while also making it available as a free function. """ setattr(TestCase, func.__name__, func) return func @inTestCase def assertRaisesLsstCpp(testcase, excClass, callableObj, *args, **kwargs): """.. note:: Deprecated in 12_0""" warnings.warn("assertRaisesLsstCpp is deprecated; please just use TestCase.assertRaises", DeprecationWarning, stacklevel=2) return testcase.assertRaises(excClass, callableObj, *args, **kwargs) def debugger(*exceptions): """Decorator to enter the debugger when there's an uncaught exception To use, just slap a "@debugger()" on your function. You may provide specific exception classes to catch as arguments to the decorator function, e.g., "@debugger(RuntimeError, NotImplementedError)". This defaults to just 'AssertionError', for use on unittest.TestCase methods. Code provided by "Rosh Oxymoron" on StackOverflow: http://stackoverflow.com/questions/4398967/python-unit-testing-automatically-running-the-debugger-when-a-test-fails """ if not exceptions: exceptions = (AssertionError, ) def decorator(f): @functools.wraps(f) def wrapper(*args, **kwargs): try: return f(*args, **kwargs) except exceptions: import sys import pdb pdb.post_mortem(sys.exc_info()[2]) return wrapper return decorator def plotImageDiff(lhs, rhs, bad=None, diff=None, plotFileName=None): """Plot the comparison of two 2-d NumPy arrays. NOTE: this method uses matplotlib and imports it internally; it should be wrapped in a try/except block within packages that do not depend on matplotlib (including utils). Parameters ---------- lhs : `numpy.ndarray` LHS values to compare; a 2-d NumPy array rhs : `numpy.ndarray` RHS values to compare; a 2-d NumPy array bad : `numpy.ndarray` A 2-d boolean NumPy array of values to emphasize in the plots diff : `numpy.ndarray` difference array; a 2-d NumPy array, or None to show lhs-rhs plotFileName : `str` Filename to save the plot to. If None, the plot will be displayed in a window. """ from matplotlib import pyplot if diff is None: diff = lhs - rhs pyplot.figure() if bad is not None: # make an rgba image that's red and transparent where not bad badImage = numpy.zeros(bad.shape + (4,), dtype=numpy.uint8) badImage[:, :, 0] = 255 badImage[:, :, 1] = 0 badImage[:, :, 2] = 0 badImage[:, :, 3] = 255*bad vmin1 = numpy.minimum(numpy.min(lhs), numpy.min(rhs)) vmax1 = numpy.maximum(numpy.max(lhs), numpy.max(rhs)) vmin2 = numpy.min(diff) vmax2 = numpy.max(diff) for n, (image, title) in enumerate([(lhs, "lhs"), (rhs, "rhs"), (diff, "diff")]): pyplot.subplot(2, 3, n + 1) im1 = pyplot.imshow(image, cmap=pyplot.cm.gray, interpolation='nearest', origin='lower', vmin=vmin1, vmax=vmax1) if bad is not None: pyplot.imshow(badImage, alpha=0.2, interpolation='nearest', origin='lower') pyplot.axis("off") pyplot.title(title) pyplot.subplot(2, 3, n + 4) im2 = pyplot.imshow(image, cmap=pyplot.cm.gray, interpolation='nearest', origin='lower', vmin=vmin2, vmax=vmax2) if bad is not None: pyplot.imshow(badImage, alpha=0.2, interpolation='nearest', origin='lower') pyplot.axis("off") pyplot.title(title) pyplot.subplots_adjust(left=0.05, bottom=0.05, top=0.92, right=0.75, wspace=0.05, hspace=0.05) cax1 = pyplot.axes([0.8, 0.55, 0.05, 0.4]) pyplot.colorbar(im1, cax=cax1) cax2 = pyplot.axes([0.8, 0.05, 0.05, 0.4]) pyplot.colorbar(im2, cax=cax2) if plotFileName: pyplot.savefig(plotFileName) else: pyplot.show() @inTestCase
[docs]def assertFloatsAlmostEqual(testCase, lhs, rhs, rtol=sys.float_info.epsilon, atol=sys.float_info.epsilon, relTo=None, printFailures=True, plotOnFailure=False, plotFileName=None, invert=False, msg=None): """Highly-configurable floating point comparisons for scalars and arrays. The test assertion will fail if all elements lhs and rhs are not equal to within the tolerances specified by rtol and atol. More precisely, the comparison is: ``abs(lhs - rhs) <= relTo*rtol OR abs(lhs - rhs) <= atol`` If rtol or atol is None, that term in the comparison is not performed at all. When not specified, relTo is the elementwise maximum of the absolute values of lhs and rhs. If set manually, it should usually be set to either lhs or rhs, or a scalar value typical of what is expected. Parameters ---------- testCase : `unittest.TestCase` Instance the test is part of. lhs : scalar or array-like LHS value(s) to compare; may be a scalar or array-like of any dimension rhs : scalar or array-like RHS value(s) to compare; may be a scalar or array-like of any dimension rtol : `float` or None Relative tolerance for comparison; defaults to double-precision epsilon. atol : `float` or None Absolute tolerance for comparison; defaults to double-precision epsilon. relTo : `float` Value to which comparison with rtol is relative. printFailures : `bool` Upon failure, print all inequal elements as part of the message. plotOnFailure : `bool` Upon failure, plot the originals and their residual with matplotlib. Only 2-d arrays are supported. plotFileName : `str` Filename to save the plot to. If None, the plot will be displayed in a window. invert : `bool` If True, invert the comparison and fail only if any elements *are* equal. Used to implement assertFloatsNotEqual, which should generally be used instead for clarity. msg : `str` String to append to the error message when assert fails. """ if not numpy.isfinite(lhs).all(): testCase.fail("Non-finite values in lhs") if not numpy.isfinite(rhs).all(): testCase.fail("Non-finite values in rhs") diff = lhs - rhs absDiff = numpy.abs(lhs - rhs) if rtol is not None: if relTo is None: relTo = numpy.maximum(numpy.abs(lhs), numpy.abs(rhs)) else: relTo = numpy.abs(relTo) bad = absDiff > rtol*relTo if atol is not None: bad = numpy.logical_and(bad, absDiff > atol) else: if atol is None: raise ValueError("rtol and atol cannot both be None") bad = absDiff > atol failed = numpy.any(bad) if invert: failed = not failed bad = numpy.logical_not(bad) cmpStr = "==" failStr = "are the same" else: cmpStr = "!=" failStr = "differ" errMsg = [] if failed: if numpy.isscalar(bad): if rtol is None: errMsg = ["%s %s %s; diff=%s with atol=%s" % (lhs, cmpStr, rhs, absDiff, atol)] elif atol is None: errMsg = ["%s %s %s; diff=%s/%s=%s with rtol=%s" % (lhs, cmpStr, rhs, absDiff, relTo, absDiff/relTo, rtol)] else: errMsg = ["%s %s %s; diff=%s/%s=%s with rtol=%s, atol=%s" % (lhs, cmpStr, rhs, absDiff, relTo, absDiff/relTo, rtol, atol)] else: errMsg = ["%d/%d elements %s with rtol=%s, atol=%s" % (bad.sum(), bad.size, failStr, rtol, atol)] if plotOnFailure: if len(lhs.shape) != 2 or len(rhs.shape) != 2: raise ValueError("plotOnFailure is only valid for 2-d arrays") try: plotImageDiff(lhs, rhs, bad, diff=diff, plotFileName=plotFileName) except ImportError: errMsg.append("Failure plot requested but matplotlib could not be imported.") if printFailures: # Make sure everything is an array if any of them are, so we can treat # them the same (diff and absDiff are arrays if either rhs or lhs is), # and we don't get here if neither is. if numpy.isscalar(relTo): relTo = numpy.ones(bad.shape, dtype=float) * relTo if numpy.isscalar(lhs): lhs = numpy.ones(bad.shape, dtype=float) * lhs if numpy.isscalar(rhs): rhs = numpy.ones(bad.shape, dtype=float) * rhs if rtol is None: for a, b, diff in zip(lhs[bad], rhs[bad], absDiff[bad]): errMsg.append("%s %s %s (diff=%s)" % (a, cmpStr, b, diff)) else: for a, b, diff, rel in zip(lhs[bad], rhs[bad], absDiff[bad], relTo[bad]): errMsg.append("%s %s %s (diff=%s/%s=%s)" % (a, cmpStr, b, diff, rel, diff/rel)) if msg is not None: errMsg.append(msg) testCase.assertFalse(failed, msg="\n".join(errMsg))
@inTestCase
[docs]def assertFloatsNotEqual(testCase, lhs, rhs, **kwds): """ Fail a test if the given floating point values are equal to within the given tolerances. See assertFloatsAlmostEqual (called with rtol=atol=0) for more information. """ return assertFloatsAlmostEqual(testCase, lhs, rhs, invert=True, **kwds)
@inTestCase
[docs]def assertFloatsEqual(testCase, lhs, rhs, **kwargs): """ Assert that lhs == rhs (both numeric types, whether scalar or array). See assertFloatsAlmostEqual (called with rtol=atol=0) for more information. """ return assertFloatsAlmostEqual(testCase, lhs, rhs, rtol=0, atol=0, **kwargs)
@inTestCase def assertClose(*args, **kwargs): """.. note:: Deprecated in 12_0""" warnings.warn("assertClose is deprecated; please use TestCase.assertFloatsAlmostEqual", DeprecationWarning, stacklevel=2) return assertFloatsAlmostEqual(*args, **kwargs) @inTestCase def assertNotClose(*args, **kwargs): """.. note:: Deprecated in 12_0""" warnings.warn("assertNotClose is deprecated; please use TestCase.assertFloatsNotEqual", DeprecationWarning, stacklevel=2) return assertFloatsNotEqual(*args, **kwargs)