import sys import unittest from doctest import DocTestSuite from test import support from test.support import threading_helper import weakref import gc # Modules under test import _thread import threading import _threading_local class Weak(object): pass def target(local, weaklist): weak = Weak() local.weak = weak weaklist.append(weakref.ref(weak)) class BaseLocalTest: def test_local_refs(self): self._local_refs(20) self._local_refs(50) self._local_refs(100) def _local_refs(self, n): local = self._local() weaklist = [] for i in range(n): t = threading.Thread(target=target, args=(local, weaklist)) t.start() t.join() del t support.gc_collect() # For PyPy or other GCs. self.assertEqual(len(weaklist), n) # XXX _threading_local keeps the local of the last stopped thread alive. deadlist = [weak for weak in weaklist if weak() is None] self.assertIn(len(deadlist), (n-1, n)) # Assignment to the same thread local frees it sometimes (!) local.someothervar = None support.gc_collect() # For PyPy or other GCs. deadlist = [weak for weak in weaklist if weak() is None] self.assertIn(len(deadlist), (n-1, n), (n, len(deadlist))) def test_derived(self): # Issue 3088: if there is a threads switch inside the __init__ # of a threading.local derived class, the per-thread dictionary # is created but not correctly set on the object. # The first member set may be bogus. import time class Local(self._local): def __init__(self): time.sleep(0.01) local = Local() def f(i): local.x = i # Simply check that the variable is correctly set self.assertEqual(local.x, i) with threading_helper.start_threads(threading.Thread(target=f, args=(i,)) for i in range(10)): pass def test_derived_cycle_dealloc(self): # http://bugs.python.org/issue6990 class Local(self._local): pass locals = None passed = False e1 = threading.Event() e2 = threading.Event() def f(): nonlocal passed # 1) Involve Local in a cycle cycle = [Local()] cycle.append(cycle) cycle[0].foo = 'bar' # 2) GC the cycle (triggers threadmodule.c::local_clear # before local_dealloc) del cycle support.gc_collect() # For PyPy or other GCs. e1.set() e2.wait() # 4) New Locals should be empty passed = all(not hasattr(local, 'foo') for local in locals) t = threading.Thread(target=f) t.start() e1.wait() # 3) New Locals should recycle the original's address. Creating # them in the thread overwrites the thread state and avoids the # bug locals = [Local() for i in range(10)] e2.set() t.join() self.assertTrue(passed) def test_arguments(self): # Issue 1522237 class MyLocal(self._local): def __init__(self, *args, **kwargs): pass MyLocal(a=1) MyLocal(1) self.assertRaises(TypeError, self._local, a=1) self.assertRaises(TypeError, self._local, 1) def _test_one_class(self, c): self._failed = "No error message set or cleared." obj = c() e1 = threading.Event() e2 = threading.Event() def f1(): obj.x = 'foo' obj.y = 'bar' del obj.y e1.set() e2.wait() def f2(): try: foo = obj.x except AttributeError: # This is expected -- we haven't set obj.x in this thread yet! self._failed = "" # passed else: self._failed = ('Incorrectly got value %r from class %r\n' % (foo, c)) sys.stderr.write(self._failed) t1 = threading.Thread(target=f1) t1.start() e1.wait() t2 = threading.Thread(target=f2) t2.start() t2.join() # The test is done; just let t1 know it can exit, and wait for it. e2.set() t1.join() self.assertFalse(self._failed, self._failed) def test_threading_local(self): self._test_one_class(self._local) def test_threading_local_subclass(self): class LocalSubclass(self._local): """To test that subclasses behave properly.""" self._test_one_class(LocalSubclass) def _test_dict_attribute(self, cls): obj = cls() obj.x = 5 self.assertEqual(obj.__dict__, {'x': 5}) with self.assertRaises(AttributeError): obj.__dict__ = {} with self.assertRaises(AttributeError): del obj.__dict__ def test_dict_attribute(self): self._test_dict_attribute(self._local) def test_dict_attribute_subclass(self): class LocalSubclass(self._local): """To test that subclasses behave properly.""" self._test_dict_attribute(LocalSubclass) def test_cycle_collection(self): class X: pass x = X() x.local = self._local() x.local.x = x wr = weakref.ref(x) del x support.gc_collect() # For PyPy or other GCs. self.assertIsNone(wr()) class ThreadLocalTest(unittest.TestCase, BaseLocalTest): _local = _thread._local class PyThreadingLocalTest(unittest.TestCase, BaseLocalTest): _local = _threading_local.local def test_main(): suite = unittest.TestSuite() suite.addTest(DocTestSuite('_threading_local')) suite.addTest(unittest.makeSuite(ThreadLocalTest)) suite.addTest(unittest.makeSuite(PyThreadingLocalTest)) local_orig = _threading_local.local def setUp(test): _threading_local.local = _thread._local def tearDown(test): _threading_local.local = local_orig suite.addTest(DocTestSuite('_threading_local', setUp=setUp, tearDown=tearDown) ) support.run_unittest(suite) if __name__ == '__main__': test_main()