2338 lines
79 KiB
Python
2338 lines
79 KiB
Python
from collections import namedtuple
|
|
import contextlib
|
|
import itertools
|
|
import os
|
|
import pickle
|
|
import sys
|
|
from textwrap import dedent
|
|
import threading
|
|
import time
|
|
import unittest
|
|
|
|
from test import support
|
|
from test.support import script_helper
|
|
|
|
|
|
interpreters = support.import_module('_xxsubinterpreters')
|
|
|
|
|
|
##################################
|
|
# helpers
|
|
|
|
def _captured_script(script):
|
|
r, w = os.pipe()
|
|
indented = script.replace('\n', '\n ')
|
|
wrapped = dedent(f"""
|
|
import contextlib
|
|
with open({w}, 'w') as spipe:
|
|
with contextlib.redirect_stdout(spipe):
|
|
{indented}
|
|
""")
|
|
return wrapped, open(r)
|
|
|
|
|
|
def _run_output(interp, request, shared=None):
|
|
script, rpipe = _captured_script(request)
|
|
with rpipe:
|
|
interpreters.run_string(interp, script, shared)
|
|
return rpipe.read()
|
|
|
|
|
|
def _wait_for_interp_to_run(interp, timeout=None):
|
|
# bpo-37224: Running this test file in multiprocesses will fail randomly.
|
|
# The failure reason is that the thread can't acquire the cpu to
|
|
# run subinterpreter eariler than the main thread in multiprocess.
|
|
if timeout is None:
|
|
timeout = support.SHORT_TIMEOUT
|
|
start_time = time.monotonic()
|
|
deadline = start_time + timeout
|
|
while not interpreters.is_running(interp):
|
|
if time.monotonic() > deadline:
|
|
raise RuntimeError('interp is not running')
|
|
time.sleep(0.010)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _running(interp):
|
|
r, w = os.pipe()
|
|
def run():
|
|
interpreters.run_string(interp, dedent(f"""
|
|
# wait for "signal"
|
|
with open({r}) as rpipe:
|
|
rpipe.read()
|
|
"""))
|
|
|
|
t = threading.Thread(target=run)
|
|
t.start()
|
|
_wait_for_interp_to_run(interp)
|
|
|
|
yield
|
|
|
|
with open(w, 'w') as spipe:
|
|
spipe.write('done')
|
|
t.join()
|
|
|
|
|
|
#@contextmanager
|
|
#def run_threaded(id, source, **shared):
|
|
# def run():
|
|
# run_interp(id, source, **shared)
|
|
# t = threading.Thread(target=run)
|
|
# t.start()
|
|
# yield
|
|
# t.join()
|
|
|
|
|
|
def run_interp(id, source, **shared):
|
|
_run_interp(id, source, shared)
|
|
|
|
|
|
def _run_interp(id, source, shared, _mainns={}):
|
|
source = dedent(source)
|
|
main = interpreters.get_main()
|
|
if main == id:
|
|
if interpreters.get_current() != main:
|
|
raise RuntimeError
|
|
# XXX Run a func?
|
|
exec(source, _mainns)
|
|
else:
|
|
interpreters.run_string(id, source, shared)
|
|
|
|
|
|
class Interpreter(namedtuple('Interpreter', 'name id')):
|
|
|
|
@classmethod
|
|
def from_raw(cls, raw):
|
|
if isinstance(raw, cls):
|
|
return raw
|
|
elif isinstance(raw, str):
|
|
return cls(raw)
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
def __new__(cls, name=None, id=None):
|
|
main = interpreters.get_main()
|
|
if id == main:
|
|
if not name:
|
|
name = 'main'
|
|
elif name != 'main':
|
|
raise ValueError(
|
|
'name mismatch (expected "main", got "{}")'.format(name))
|
|
id = main
|
|
elif id is not None:
|
|
if not name:
|
|
name = 'interp'
|
|
elif name == 'main':
|
|
raise ValueError('name mismatch (unexpected "main")')
|
|
if not isinstance(id, interpreters.InterpreterID):
|
|
id = interpreters.InterpreterID(id)
|
|
elif not name or name == 'main':
|
|
name = 'main'
|
|
id = main
|
|
else:
|
|
id = interpreters.create()
|
|
self = super().__new__(cls, name, id)
|
|
return self
|
|
|
|
|
|
# XXX expect_channel_closed() is unnecessary once we improve exc propagation.
|
|
|
|
@contextlib.contextmanager
|
|
def expect_channel_closed():
|
|
try:
|
|
yield
|
|
except interpreters.ChannelClosedError:
|
|
pass
|
|
else:
|
|
assert False, 'channel not closed'
|
|
|
|
|
|
class ChannelAction(namedtuple('ChannelAction', 'action end interp')):
|
|
|
|
def __new__(cls, action, end=None, interp=None):
|
|
if not end:
|
|
end = 'both'
|
|
if not interp:
|
|
interp = 'main'
|
|
self = super().__new__(cls, action, end, interp)
|
|
return self
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
if self.action == 'use':
|
|
if self.end not in ('same', 'opposite', 'send', 'recv'):
|
|
raise ValueError(self.end)
|
|
elif self.action in ('close', 'force-close'):
|
|
if self.end not in ('both', 'same', 'opposite', 'send', 'recv'):
|
|
raise ValueError(self.end)
|
|
else:
|
|
raise ValueError(self.action)
|
|
if self.interp not in ('main', 'same', 'other', 'extra'):
|
|
raise ValueError(self.interp)
|
|
|
|
def resolve_end(self, end):
|
|
if self.end == 'same':
|
|
return end
|
|
elif self.end == 'opposite':
|
|
return 'recv' if end == 'send' else 'send'
|
|
else:
|
|
return self.end
|
|
|
|
def resolve_interp(self, interp, other, extra):
|
|
if self.interp == 'same':
|
|
return interp
|
|
elif self.interp == 'other':
|
|
if other is None:
|
|
raise RuntimeError
|
|
return other
|
|
elif self.interp == 'extra':
|
|
if extra is None:
|
|
raise RuntimeError
|
|
return extra
|
|
elif self.interp == 'main':
|
|
if interp.name == 'main':
|
|
return interp
|
|
elif other and other.name == 'main':
|
|
return other
|
|
else:
|
|
raise RuntimeError
|
|
# Per __init__(), there aren't any others.
|
|
|
|
|
|
class ChannelState(namedtuple('ChannelState', 'pending closed')):
|
|
|
|
def __new__(cls, pending=0, *, closed=False):
|
|
self = super().__new__(cls, pending, closed)
|
|
return self
|
|
|
|
def incr(self):
|
|
return type(self)(self.pending + 1, closed=self.closed)
|
|
|
|
def decr(self):
|
|
return type(self)(self.pending - 1, closed=self.closed)
|
|
|
|
def close(self, *, force=True):
|
|
if self.closed:
|
|
if not force or self.pending == 0:
|
|
return self
|
|
return type(self)(0 if force else self.pending, closed=True)
|
|
|
|
|
|
def run_action(cid, action, end, state, *, hideclosed=True):
|
|
if state.closed:
|
|
if action == 'use' and end == 'recv' and state.pending:
|
|
expectfail = False
|
|
else:
|
|
expectfail = True
|
|
else:
|
|
expectfail = False
|
|
|
|
try:
|
|
result = _run_action(cid, action, end, state)
|
|
except interpreters.ChannelClosedError:
|
|
if not hideclosed and not expectfail:
|
|
raise
|
|
result = state.close()
|
|
else:
|
|
if expectfail:
|
|
raise ... # XXX
|
|
return result
|
|
|
|
|
|
def _run_action(cid, action, end, state):
|
|
if action == 'use':
|
|
if end == 'send':
|
|
interpreters.channel_send(cid, b'spam')
|
|
return state.incr()
|
|
elif end == 'recv':
|
|
if not state.pending:
|
|
try:
|
|
interpreters.channel_recv(cid)
|
|
except interpreters.ChannelEmptyError:
|
|
return state
|
|
else:
|
|
raise Exception('expected ChannelEmptyError')
|
|
else:
|
|
interpreters.channel_recv(cid)
|
|
return state.decr()
|
|
else:
|
|
raise ValueError(end)
|
|
elif action == 'close':
|
|
kwargs = {}
|
|
if end in ('recv', 'send'):
|
|
kwargs[end] = True
|
|
interpreters.channel_close(cid, **kwargs)
|
|
return state.close()
|
|
elif action == 'force-close':
|
|
kwargs = {
|
|
'force': True,
|
|
}
|
|
if end in ('recv', 'send'):
|
|
kwargs[end] = True
|
|
interpreters.channel_close(cid, **kwargs)
|
|
return state.close(force=True)
|
|
else:
|
|
raise ValueError(action)
|
|
|
|
|
|
def clean_up_interpreters():
|
|
for id in interpreters.list_all():
|
|
if id == 0: # main
|
|
continue
|
|
try:
|
|
interpreters.destroy(id)
|
|
except RuntimeError:
|
|
pass # already destroyed
|
|
|
|
|
|
def clean_up_channels():
|
|
for cid in interpreters.channel_list_all():
|
|
try:
|
|
interpreters.channel_destroy(cid)
|
|
except interpreters.ChannelNotFoundError:
|
|
pass # already destroyed
|
|
|
|
|
|
class TestBase(unittest.TestCase):
|
|
|
|
def tearDown(self):
|
|
clean_up_interpreters()
|
|
clean_up_channels()
|
|
|
|
|
|
##################################
|
|
# misc. tests
|
|
|
|
class IsShareableTests(unittest.TestCase):
|
|
|
|
def test_default_shareables(self):
|
|
shareables = [
|
|
# singletons
|
|
None,
|
|
# builtin objects
|
|
b'spam',
|
|
'spam',
|
|
10,
|
|
-10,
|
|
]
|
|
for obj in shareables:
|
|
with self.subTest(obj):
|
|
self.assertTrue(
|
|
interpreters.is_shareable(obj))
|
|
|
|
def test_not_shareable(self):
|
|
class Cheese:
|
|
def __init__(self, name):
|
|
self.name = name
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
class SubBytes(bytes):
|
|
"""A subclass of a shareable type."""
|
|
|
|
not_shareables = [
|
|
# singletons
|
|
True,
|
|
False,
|
|
NotImplemented,
|
|
...,
|
|
# builtin types and objects
|
|
type,
|
|
object,
|
|
object(),
|
|
Exception(),
|
|
100.0,
|
|
# user-defined types and objects
|
|
Cheese,
|
|
Cheese('Wensleydale'),
|
|
SubBytes(b'spam'),
|
|
]
|
|
for obj in not_shareables:
|
|
with self.subTest(repr(obj)):
|
|
self.assertFalse(
|
|
interpreters.is_shareable(obj))
|
|
|
|
|
|
class ShareableTypeTests(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.cid = interpreters.channel_create()
|
|
|
|
def tearDown(self):
|
|
interpreters.channel_destroy(self.cid)
|
|
super().tearDown()
|
|
|
|
def _assert_values(self, values):
|
|
for obj in values:
|
|
with self.subTest(obj):
|
|
interpreters.channel_send(self.cid, obj)
|
|
got = interpreters.channel_recv(self.cid)
|
|
|
|
self.assertEqual(got, obj)
|
|
self.assertIs(type(got), type(obj))
|
|
# XXX Check the following in the channel tests?
|
|
#self.assertIsNot(got, obj)
|
|
|
|
def test_singletons(self):
|
|
for obj in [None]:
|
|
with self.subTest(obj):
|
|
interpreters.channel_send(self.cid, obj)
|
|
got = interpreters.channel_recv(self.cid)
|
|
|
|
# XXX What about between interpreters?
|
|
self.assertIs(got, obj)
|
|
|
|
def test_types(self):
|
|
self._assert_values([
|
|
b'spam',
|
|
9999,
|
|
self.cid,
|
|
])
|
|
|
|
def test_bytes(self):
|
|
self._assert_values(i.to_bytes(2, 'little', signed=True)
|
|
for i in range(-1, 258))
|
|
|
|
def test_strs(self):
|
|
self._assert_values(['hello world', '你好世界', ''])
|
|
|
|
def test_int(self):
|
|
self._assert_values(itertools.chain(range(-1, 258),
|
|
[sys.maxsize, -sys.maxsize - 1]))
|
|
|
|
def test_non_shareable_int(self):
|
|
ints = [
|
|
sys.maxsize + 1,
|
|
-sys.maxsize - 2,
|
|
2**1000,
|
|
]
|
|
for i in ints:
|
|
with self.subTest(i):
|
|
with self.assertRaises(OverflowError):
|
|
interpreters.channel_send(self.cid, i)
|
|
|
|
|
|
##################################
|
|
# interpreter tests
|
|
|
|
class ListAllTests(TestBase):
|
|
|
|
def test_initial(self):
|
|
main = interpreters.get_main()
|
|
ids = interpreters.list_all()
|
|
self.assertEqual(ids, [main])
|
|
|
|
def test_after_creating(self):
|
|
main = interpreters.get_main()
|
|
first = interpreters.create()
|
|
second = interpreters.create()
|
|
ids = interpreters.list_all()
|
|
self.assertEqual(ids, [main, first, second])
|
|
|
|
def test_after_destroying(self):
|
|
main = interpreters.get_main()
|
|
first = interpreters.create()
|
|
second = interpreters.create()
|
|
interpreters.destroy(first)
|
|
ids = interpreters.list_all()
|
|
self.assertEqual(ids, [main, second])
|
|
|
|
|
|
class GetCurrentTests(TestBase):
|
|
|
|
def test_main(self):
|
|
main = interpreters.get_main()
|
|
cur = interpreters.get_current()
|
|
self.assertEqual(cur, main)
|
|
self.assertIsInstance(cur, interpreters.InterpreterID)
|
|
|
|
def test_subinterpreter(self):
|
|
main = interpreters.get_main()
|
|
interp = interpreters.create()
|
|
out = _run_output(interp, dedent("""
|
|
import _xxsubinterpreters as _interpreters
|
|
cur = _interpreters.get_current()
|
|
print(cur)
|
|
assert isinstance(cur, _interpreters.InterpreterID)
|
|
"""))
|
|
cur = int(out.strip())
|
|
_, expected = interpreters.list_all()
|
|
self.assertEqual(cur, expected)
|
|
self.assertNotEqual(cur, main)
|
|
|
|
|
|
class GetMainTests(TestBase):
|
|
|
|
def test_from_main(self):
|
|
[expected] = interpreters.list_all()
|
|
main = interpreters.get_main()
|
|
self.assertEqual(main, expected)
|
|
self.assertIsInstance(main, interpreters.InterpreterID)
|
|
|
|
def test_from_subinterpreter(self):
|
|
[expected] = interpreters.list_all()
|
|
interp = interpreters.create()
|
|
out = _run_output(interp, dedent("""
|
|
import _xxsubinterpreters as _interpreters
|
|
main = _interpreters.get_main()
|
|
print(main)
|
|
assert isinstance(main, _interpreters.InterpreterID)
|
|
"""))
|
|
main = int(out.strip())
|
|
self.assertEqual(main, expected)
|
|
|
|
|
|
class IsRunningTests(TestBase):
|
|
|
|
def test_main(self):
|
|
main = interpreters.get_main()
|
|
self.assertTrue(interpreters.is_running(main))
|
|
|
|
def test_subinterpreter(self):
|
|
interp = interpreters.create()
|
|
self.assertFalse(interpreters.is_running(interp))
|
|
|
|
with _running(interp):
|
|
self.assertTrue(interpreters.is_running(interp))
|
|
self.assertFalse(interpreters.is_running(interp))
|
|
|
|
def test_from_subinterpreter(self):
|
|
interp = interpreters.create()
|
|
out = _run_output(interp, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
if _interpreters.is_running({interp}):
|
|
print(True)
|
|
else:
|
|
print(False)
|
|
"""))
|
|
self.assertEqual(out.strip(), 'True')
|
|
|
|
def test_already_destroyed(self):
|
|
interp = interpreters.create()
|
|
interpreters.destroy(interp)
|
|
with self.assertRaises(RuntimeError):
|
|
interpreters.is_running(interp)
|
|
|
|
def test_does_not_exist(self):
|
|
with self.assertRaises(RuntimeError):
|
|
interpreters.is_running(1_000_000)
|
|
|
|
def test_bad_id(self):
|
|
with self.assertRaises(ValueError):
|
|
interpreters.is_running(-1)
|
|
|
|
|
|
class InterpreterIDTests(TestBase):
|
|
|
|
def test_with_int(self):
|
|
id = interpreters.InterpreterID(10, force=True)
|
|
|
|
self.assertEqual(int(id), 10)
|
|
|
|
def test_coerce_id(self):
|
|
class Int(str):
|
|
def __index__(self):
|
|
return 10
|
|
|
|
id = interpreters.InterpreterID(Int(), force=True)
|
|
self.assertEqual(int(id), 10)
|
|
|
|
def test_bad_id(self):
|
|
self.assertRaises(TypeError, interpreters.InterpreterID, object())
|
|
self.assertRaises(TypeError, interpreters.InterpreterID, 10.0)
|
|
self.assertRaises(TypeError, interpreters.InterpreterID, '10')
|
|
self.assertRaises(TypeError, interpreters.InterpreterID, b'10')
|
|
self.assertRaises(ValueError, interpreters.InterpreterID, -1)
|
|
self.assertRaises(OverflowError, interpreters.InterpreterID, 2**64)
|
|
|
|
def test_does_not_exist(self):
|
|
id = interpreters.channel_create()
|
|
with self.assertRaises(RuntimeError):
|
|
interpreters.InterpreterID(int(id) + 1) # unforced
|
|
|
|
def test_str(self):
|
|
id = interpreters.InterpreterID(10, force=True)
|
|
self.assertEqual(str(id), '10')
|
|
|
|
def test_repr(self):
|
|
id = interpreters.InterpreterID(10, force=True)
|
|
self.assertEqual(repr(id), 'InterpreterID(10)')
|
|
|
|
def test_equality(self):
|
|
id1 = interpreters.create()
|
|
id2 = interpreters.InterpreterID(int(id1))
|
|
id3 = interpreters.create()
|
|
|
|
self.assertTrue(id1 == id1)
|
|
self.assertTrue(id1 == id2)
|
|
self.assertTrue(id1 == int(id1))
|
|
self.assertTrue(int(id1) == id1)
|
|
self.assertTrue(id1 == float(int(id1)))
|
|
self.assertTrue(float(int(id1)) == id1)
|
|
self.assertFalse(id1 == float(int(id1)) + 0.1)
|
|
self.assertFalse(id1 == str(int(id1)))
|
|
self.assertFalse(id1 == 2**1000)
|
|
self.assertFalse(id1 == float('inf'))
|
|
self.assertFalse(id1 == 'spam')
|
|
self.assertFalse(id1 == id3)
|
|
|
|
self.assertFalse(id1 != id1)
|
|
self.assertFalse(id1 != id2)
|
|
self.assertTrue(id1 != id3)
|
|
|
|
|
|
class CreateTests(TestBase):
|
|
|
|
def test_in_main(self):
|
|
id = interpreters.create()
|
|
self.assertIsInstance(id, interpreters.InterpreterID)
|
|
|
|
self.assertIn(id, interpreters.list_all())
|
|
|
|
@unittest.skip('enable this test when working on pystate.c')
|
|
def test_unique_id(self):
|
|
seen = set()
|
|
for _ in range(100):
|
|
id = interpreters.create()
|
|
interpreters.destroy(id)
|
|
seen.add(id)
|
|
|
|
self.assertEqual(len(seen), 100)
|
|
|
|
def test_in_thread(self):
|
|
lock = threading.Lock()
|
|
id = None
|
|
def f():
|
|
nonlocal id
|
|
id = interpreters.create()
|
|
lock.acquire()
|
|
lock.release()
|
|
|
|
t = threading.Thread(target=f)
|
|
with lock:
|
|
t.start()
|
|
t.join()
|
|
self.assertIn(id, interpreters.list_all())
|
|
|
|
def test_in_subinterpreter(self):
|
|
main, = interpreters.list_all()
|
|
id1 = interpreters.create()
|
|
out = _run_output(id1, dedent("""
|
|
import _xxsubinterpreters as _interpreters
|
|
id = _interpreters.create()
|
|
print(id)
|
|
assert isinstance(id, _interpreters.InterpreterID)
|
|
"""))
|
|
id2 = int(out.strip())
|
|
|
|
self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
|
|
|
|
def test_in_threaded_subinterpreter(self):
|
|
main, = interpreters.list_all()
|
|
id1 = interpreters.create()
|
|
id2 = None
|
|
def f():
|
|
nonlocal id2
|
|
out = _run_output(id1, dedent("""
|
|
import _xxsubinterpreters as _interpreters
|
|
id = _interpreters.create()
|
|
print(id)
|
|
"""))
|
|
id2 = int(out.strip())
|
|
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
t.join()
|
|
|
|
self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
|
|
|
|
def test_after_destroy_all(self):
|
|
before = set(interpreters.list_all())
|
|
# Create 3 subinterpreters.
|
|
ids = []
|
|
for _ in range(3):
|
|
id = interpreters.create()
|
|
ids.append(id)
|
|
# Now destroy them.
|
|
for id in ids:
|
|
interpreters.destroy(id)
|
|
# Finally, create another.
|
|
id = interpreters.create()
|
|
self.assertEqual(set(interpreters.list_all()), before | {id})
|
|
|
|
def test_after_destroy_some(self):
|
|
before = set(interpreters.list_all())
|
|
# Create 3 subinterpreters.
|
|
id1 = interpreters.create()
|
|
id2 = interpreters.create()
|
|
id3 = interpreters.create()
|
|
# Now destroy 2 of them.
|
|
interpreters.destroy(id1)
|
|
interpreters.destroy(id3)
|
|
# Finally, create another.
|
|
id = interpreters.create()
|
|
self.assertEqual(set(interpreters.list_all()), before | {id, id2})
|
|
|
|
|
|
class DestroyTests(TestBase):
|
|
|
|
def test_one(self):
|
|
id1 = interpreters.create()
|
|
id2 = interpreters.create()
|
|
id3 = interpreters.create()
|
|
self.assertIn(id2, interpreters.list_all())
|
|
interpreters.destroy(id2)
|
|
self.assertNotIn(id2, interpreters.list_all())
|
|
self.assertIn(id1, interpreters.list_all())
|
|
self.assertIn(id3, interpreters.list_all())
|
|
|
|
def test_all(self):
|
|
before = set(interpreters.list_all())
|
|
ids = set()
|
|
for _ in range(3):
|
|
id = interpreters.create()
|
|
ids.add(id)
|
|
self.assertEqual(set(interpreters.list_all()), before | ids)
|
|
for id in ids:
|
|
interpreters.destroy(id)
|
|
self.assertEqual(set(interpreters.list_all()), before)
|
|
|
|
def test_main(self):
|
|
main, = interpreters.list_all()
|
|
with self.assertRaises(RuntimeError):
|
|
interpreters.destroy(main)
|
|
|
|
def f():
|
|
with self.assertRaises(RuntimeError):
|
|
interpreters.destroy(main)
|
|
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
t.join()
|
|
|
|
def test_already_destroyed(self):
|
|
id = interpreters.create()
|
|
interpreters.destroy(id)
|
|
with self.assertRaises(RuntimeError):
|
|
interpreters.destroy(id)
|
|
|
|
def test_does_not_exist(self):
|
|
with self.assertRaises(RuntimeError):
|
|
interpreters.destroy(1_000_000)
|
|
|
|
def test_bad_id(self):
|
|
with self.assertRaises(ValueError):
|
|
interpreters.destroy(-1)
|
|
|
|
def test_from_current(self):
|
|
main, = interpreters.list_all()
|
|
id = interpreters.create()
|
|
script = dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
try:
|
|
_interpreters.destroy({id})
|
|
except RuntimeError:
|
|
pass
|
|
""")
|
|
|
|
interpreters.run_string(id, script)
|
|
self.assertEqual(set(interpreters.list_all()), {main, id})
|
|
|
|
def test_from_sibling(self):
|
|
main, = interpreters.list_all()
|
|
id1 = interpreters.create()
|
|
id2 = interpreters.create()
|
|
script = dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
_interpreters.destroy({id2})
|
|
""")
|
|
interpreters.run_string(id1, script)
|
|
|
|
self.assertEqual(set(interpreters.list_all()), {main, id1})
|
|
|
|
def test_from_other_thread(self):
|
|
id = interpreters.create()
|
|
def f():
|
|
interpreters.destroy(id)
|
|
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
t.join()
|
|
|
|
def test_still_running(self):
|
|
main, = interpreters.list_all()
|
|
interp = interpreters.create()
|
|
with _running(interp):
|
|
self.assertTrue(interpreters.is_running(interp),
|
|
msg=f"Interp {interp} should be running before destruction.")
|
|
|
|
with self.assertRaises(RuntimeError,
|
|
msg=f"Should not be able to destroy interp {interp} while it's still running."):
|
|
interpreters.destroy(interp)
|
|
self.assertTrue(interpreters.is_running(interp))
|
|
|
|
|
|
class RunStringTests(TestBase):
|
|
|
|
SCRIPT = dedent("""
|
|
with open('{}', 'w') as out:
|
|
out.write('{}')
|
|
""")
|
|
FILENAME = 'spam'
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.id = interpreters.create()
|
|
self._fs = None
|
|
|
|
def tearDown(self):
|
|
if self._fs is not None:
|
|
self._fs.close()
|
|
super().tearDown()
|
|
|
|
def test_success(self):
|
|
script, file = _captured_script('print("it worked!", end="")')
|
|
with file:
|
|
interpreters.run_string(self.id, script)
|
|
out = file.read()
|
|
|
|
self.assertEqual(out, 'it worked!')
|
|
|
|
def test_in_thread(self):
|
|
script, file = _captured_script('print("it worked!", end="")')
|
|
with file:
|
|
def f():
|
|
interpreters.run_string(self.id, script)
|
|
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
t.join()
|
|
out = file.read()
|
|
|
|
self.assertEqual(out, 'it worked!')
|
|
|
|
def test_create_thread(self):
|
|
subinterp = interpreters.create(isolated=False)
|
|
script, file = _captured_script("""
|
|
import threading
|
|
def f():
|
|
print('it worked!', end='')
|
|
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
t.join()
|
|
""")
|
|
with file:
|
|
interpreters.run_string(subinterp, script)
|
|
out = file.read()
|
|
|
|
self.assertEqual(out, 'it worked!')
|
|
|
|
@unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
|
|
def test_fork(self):
|
|
import tempfile
|
|
with tempfile.NamedTemporaryFile('w+') as file:
|
|
file.write('')
|
|
file.flush()
|
|
|
|
expected = 'spam spam spam spam spam'
|
|
script = dedent(f"""
|
|
import os
|
|
try:
|
|
os.fork()
|
|
except RuntimeError:
|
|
with open('{file.name}', 'w') as out:
|
|
out.write('{expected}')
|
|
""")
|
|
interpreters.run_string(self.id, script)
|
|
|
|
file.seek(0)
|
|
content = file.read()
|
|
self.assertEqual(content, expected)
|
|
|
|
def test_already_running(self):
|
|
with _running(self.id):
|
|
with self.assertRaises(RuntimeError):
|
|
interpreters.run_string(self.id, 'print("spam")')
|
|
|
|
def test_does_not_exist(self):
|
|
id = 0
|
|
while id in interpreters.list_all():
|
|
id += 1
|
|
with self.assertRaises(RuntimeError):
|
|
interpreters.run_string(id, 'print("spam")')
|
|
|
|
def test_error_id(self):
|
|
with self.assertRaises(ValueError):
|
|
interpreters.run_string(-1, 'print("spam")')
|
|
|
|
def test_bad_id(self):
|
|
with self.assertRaises(TypeError):
|
|
interpreters.run_string('spam', 'print("spam")')
|
|
|
|
def test_bad_script(self):
|
|
with self.assertRaises(TypeError):
|
|
interpreters.run_string(self.id, 10)
|
|
|
|
def test_bytes_for_script(self):
|
|
with self.assertRaises(TypeError):
|
|
interpreters.run_string(self.id, b'print("spam")')
|
|
|
|
@contextlib.contextmanager
|
|
def assert_run_failed(self, exctype, msg=None):
|
|
with self.assertRaises(interpreters.RunFailedError) as caught:
|
|
yield
|
|
if msg is None:
|
|
self.assertEqual(str(caught.exception).split(':')[0],
|
|
str(exctype))
|
|
else:
|
|
self.assertEqual(str(caught.exception),
|
|
"{}: {}".format(exctype, msg))
|
|
|
|
def test_invalid_syntax(self):
|
|
with self.assert_run_failed(SyntaxError):
|
|
# missing close paren
|
|
interpreters.run_string(self.id, 'print("spam"')
|
|
|
|
def test_failure(self):
|
|
with self.assert_run_failed(Exception, 'spam'):
|
|
interpreters.run_string(self.id, 'raise Exception("spam")')
|
|
|
|
def test_SystemExit(self):
|
|
with self.assert_run_failed(SystemExit, '42'):
|
|
interpreters.run_string(self.id, 'raise SystemExit(42)')
|
|
|
|
def test_sys_exit(self):
|
|
with self.assert_run_failed(SystemExit):
|
|
interpreters.run_string(self.id, dedent("""
|
|
import sys
|
|
sys.exit()
|
|
"""))
|
|
|
|
with self.assert_run_failed(SystemExit, '42'):
|
|
interpreters.run_string(self.id, dedent("""
|
|
import sys
|
|
sys.exit(42)
|
|
"""))
|
|
|
|
def test_with_shared(self):
|
|
r, w = os.pipe()
|
|
|
|
shared = {
|
|
'spam': b'ham',
|
|
'eggs': b'-1',
|
|
'cheddar': None,
|
|
}
|
|
script = dedent(f"""
|
|
eggs = int(eggs)
|
|
spam = 42
|
|
result = spam + eggs
|
|
|
|
ns = dict(vars())
|
|
del ns['__builtins__']
|
|
import pickle
|
|
with open({w}, 'wb') as chan:
|
|
pickle.dump(ns, chan)
|
|
""")
|
|
interpreters.run_string(self.id, script, shared)
|
|
with open(r, 'rb') as chan:
|
|
ns = pickle.load(chan)
|
|
|
|
self.assertEqual(ns['spam'], 42)
|
|
self.assertEqual(ns['eggs'], -1)
|
|
self.assertEqual(ns['result'], 41)
|
|
self.assertIsNone(ns['cheddar'])
|
|
|
|
def test_shared_overwrites(self):
|
|
interpreters.run_string(self.id, dedent("""
|
|
spam = 'eggs'
|
|
ns1 = dict(vars())
|
|
del ns1['__builtins__']
|
|
"""))
|
|
|
|
shared = {'spam': b'ham'}
|
|
script = dedent(f"""
|
|
ns2 = dict(vars())
|
|
del ns2['__builtins__']
|
|
""")
|
|
interpreters.run_string(self.id, script, shared)
|
|
|
|
r, w = os.pipe()
|
|
script = dedent(f"""
|
|
ns = dict(vars())
|
|
del ns['__builtins__']
|
|
import pickle
|
|
with open({w}, 'wb') as chan:
|
|
pickle.dump(ns, chan)
|
|
""")
|
|
interpreters.run_string(self.id, script)
|
|
with open(r, 'rb') as chan:
|
|
ns = pickle.load(chan)
|
|
|
|
self.assertEqual(ns['ns1']['spam'], 'eggs')
|
|
self.assertEqual(ns['ns2']['spam'], b'ham')
|
|
self.assertEqual(ns['spam'], b'ham')
|
|
|
|
def test_shared_overwrites_default_vars(self):
|
|
r, w = os.pipe()
|
|
|
|
shared = {'__name__': b'not __main__'}
|
|
script = dedent(f"""
|
|
spam = 42
|
|
|
|
ns = dict(vars())
|
|
del ns['__builtins__']
|
|
import pickle
|
|
with open({w}, 'wb') as chan:
|
|
pickle.dump(ns, chan)
|
|
""")
|
|
interpreters.run_string(self.id, script, shared)
|
|
with open(r, 'rb') as chan:
|
|
ns = pickle.load(chan)
|
|
|
|
self.assertEqual(ns['__name__'], b'not __main__')
|
|
|
|
def test_main_reused(self):
|
|
r, w = os.pipe()
|
|
interpreters.run_string(self.id, dedent(f"""
|
|
spam = True
|
|
|
|
ns = dict(vars())
|
|
del ns['__builtins__']
|
|
import pickle
|
|
with open({w}, 'wb') as chan:
|
|
pickle.dump(ns, chan)
|
|
del ns, pickle, chan
|
|
"""))
|
|
with open(r, 'rb') as chan:
|
|
ns1 = pickle.load(chan)
|
|
|
|
r, w = os.pipe()
|
|
interpreters.run_string(self.id, dedent(f"""
|
|
eggs = False
|
|
|
|
ns = dict(vars())
|
|
del ns['__builtins__']
|
|
import pickle
|
|
with open({w}, 'wb') as chan:
|
|
pickle.dump(ns, chan)
|
|
"""))
|
|
with open(r, 'rb') as chan:
|
|
ns2 = pickle.load(chan)
|
|
|
|
self.assertIn('spam', ns1)
|
|
self.assertNotIn('eggs', ns1)
|
|
self.assertIn('eggs', ns2)
|
|
self.assertIn('spam', ns2)
|
|
|
|
def test_execution_namespace_is_main(self):
|
|
r, w = os.pipe()
|
|
|
|
script = dedent(f"""
|
|
spam = 42
|
|
|
|
ns = dict(vars())
|
|
ns['__builtins__'] = str(ns['__builtins__'])
|
|
import pickle
|
|
with open({w}, 'wb') as chan:
|
|
pickle.dump(ns, chan)
|
|
""")
|
|
interpreters.run_string(self.id, script)
|
|
with open(r, 'rb') as chan:
|
|
ns = pickle.load(chan)
|
|
|
|
ns.pop('__builtins__')
|
|
ns.pop('__loader__')
|
|
self.assertEqual(ns, {
|
|
'__name__': '__main__',
|
|
'__annotations__': {},
|
|
'__doc__': None,
|
|
'__package__': None,
|
|
'__spec__': None,
|
|
'spam': 42,
|
|
})
|
|
|
|
# XXX Fix this test!
|
|
@unittest.skip('blocking forever')
|
|
def test_still_running_at_exit(self):
|
|
script = dedent(f"""
|
|
from textwrap import dedent
|
|
import threading
|
|
import _xxsubinterpreters as _interpreters
|
|
id = _interpreters.create()
|
|
def f():
|
|
_interpreters.run_string(id, dedent('''
|
|
import time
|
|
# Give plenty of time for the main interpreter to finish.
|
|
time.sleep(1_000_000)
|
|
'''))
|
|
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
""")
|
|
with support.temp_dir() as dirname:
|
|
filename = script_helper.make_script(dirname, 'interp', script)
|
|
with script_helper.spawn_python(filename) as proc:
|
|
retcode = proc.wait()
|
|
|
|
self.assertEqual(retcode, 0)
|
|
|
|
|
|
##################################
|
|
# channel tests
|
|
|
|
class ChannelIDTests(TestBase):
|
|
|
|
def test_default_kwargs(self):
|
|
cid = interpreters._channel_id(10, force=True)
|
|
|
|
self.assertEqual(int(cid), 10)
|
|
self.assertEqual(cid.end, 'both')
|
|
|
|
def test_with_kwargs(self):
|
|
cid = interpreters._channel_id(10, send=True, force=True)
|
|
self.assertEqual(cid.end, 'send')
|
|
|
|
cid = interpreters._channel_id(10, send=True, recv=False, force=True)
|
|
self.assertEqual(cid.end, 'send')
|
|
|
|
cid = interpreters._channel_id(10, recv=True, force=True)
|
|
self.assertEqual(cid.end, 'recv')
|
|
|
|
cid = interpreters._channel_id(10, recv=True, send=False, force=True)
|
|
self.assertEqual(cid.end, 'recv')
|
|
|
|
cid = interpreters._channel_id(10, send=True, recv=True, force=True)
|
|
self.assertEqual(cid.end, 'both')
|
|
|
|
def test_coerce_id(self):
|
|
class Int(str):
|
|
def __index__(self):
|
|
return 10
|
|
|
|
cid = interpreters._channel_id(Int(), force=True)
|
|
self.assertEqual(int(cid), 10)
|
|
|
|
def test_bad_id(self):
|
|
self.assertRaises(TypeError, interpreters._channel_id, object())
|
|
self.assertRaises(TypeError, interpreters._channel_id, 10.0)
|
|
self.assertRaises(TypeError, interpreters._channel_id, '10')
|
|
self.assertRaises(TypeError, interpreters._channel_id, b'10')
|
|
self.assertRaises(ValueError, interpreters._channel_id, -1)
|
|
self.assertRaises(OverflowError, interpreters._channel_id, 2**64)
|
|
|
|
def test_bad_kwargs(self):
|
|
with self.assertRaises(ValueError):
|
|
interpreters._channel_id(10, send=False, recv=False)
|
|
|
|
def test_does_not_exist(self):
|
|
cid = interpreters.channel_create()
|
|
with self.assertRaises(interpreters.ChannelNotFoundError):
|
|
interpreters._channel_id(int(cid) + 1) # unforced
|
|
|
|
def test_str(self):
|
|
cid = interpreters._channel_id(10, force=True)
|
|
self.assertEqual(str(cid), '10')
|
|
|
|
def test_repr(self):
|
|
cid = interpreters._channel_id(10, force=True)
|
|
self.assertEqual(repr(cid), 'ChannelID(10)')
|
|
|
|
cid = interpreters._channel_id(10, send=True, force=True)
|
|
self.assertEqual(repr(cid), 'ChannelID(10, send=True)')
|
|
|
|
cid = interpreters._channel_id(10, recv=True, force=True)
|
|
self.assertEqual(repr(cid), 'ChannelID(10, recv=True)')
|
|
|
|
cid = interpreters._channel_id(10, send=True, recv=True, force=True)
|
|
self.assertEqual(repr(cid), 'ChannelID(10)')
|
|
|
|
def test_equality(self):
|
|
cid1 = interpreters.channel_create()
|
|
cid2 = interpreters._channel_id(int(cid1))
|
|
cid3 = interpreters.channel_create()
|
|
|
|
self.assertTrue(cid1 == cid1)
|
|
self.assertTrue(cid1 == cid2)
|
|
self.assertTrue(cid1 == int(cid1))
|
|
self.assertTrue(int(cid1) == cid1)
|
|
self.assertTrue(cid1 == float(int(cid1)))
|
|
self.assertTrue(float(int(cid1)) == cid1)
|
|
self.assertFalse(cid1 == float(int(cid1)) + 0.1)
|
|
self.assertFalse(cid1 == str(int(cid1)))
|
|
self.assertFalse(cid1 == 2**1000)
|
|
self.assertFalse(cid1 == float('inf'))
|
|
self.assertFalse(cid1 == 'spam')
|
|
self.assertFalse(cid1 == cid3)
|
|
|
|
self.assertFalse(cid1 != cid1)
|
|
self.assertFalse(cid1 != cid2)
|
|
self.assertTrue(cid1 != cid3)
|
|
|
|
|
|
class ChannelTests(TestBase):
|
|
|
|
def test_create_cid(self):
|
|
cid = interpreters.channel_create()
|
|
self.assertIsInstance(cid, interpreters.ChannelID)
|
|
|
|
def test_sequential_ids(self):
|
|
before = interpreters.channel_list_all()
|
|
id1 = interpreters.channel_create()
|
|
id2 = interpreters.channel_create()
|
|
id3 = interpreters.channel_create()
|
|
after = interpreters.channel_list_all()
|
|
|
|
self.assertEqual(id2, int(id1) + 1)
|
|
self.assertEqual(id3, int(id2) + 1)
|
|
self.assertEqual(set(after) - set(before), {id1, id2, id3})
|
|
|
|
def test_ids_global(self):
|
|
id1 = interpreters.create()
|
|
out = _run_output(id1, dedent("""
|
|
import _xxsubinterpreters as _interpreters
|
|
cid = _interpreters.channel_create()
|
|
print(cid)
|
|
"""))
|
|
cid1 = int(out.strip())
|
|
|
|
id2 = interpreters.create()
|
|
out = _run_output(id2, dedent("""
|
|
import _xxsubinterpreters as _interpreters
|
|
cid = _interpreters.channel_create()
|
|
print(cid)
|
|
"""))
|
|
cid2 = int(out.strip())
|
|
|
|
self.assertEqual(cid2, int(cid1) + 1)
|
|
|
|
def test_channel_list_interpreters_none(self):
|
|
"""Test listing interpreters for a channel with no associations."""
|
|
# Test for channel with no associated interpreters.
|
|
cid = interpreters.channel_create()
|
|
send_interps = interpreters.channel_list_interpreters(cid, send=True)
|
|
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
|
|
self.assertEqual(send_interps, [])
|
|
self.assertEqual(recv_interps, [])
|
|
|
|
def test_channel_list_interpreters_basic(self):
|
|
"""Test basic listing channel interpreters."""
|
|
interp0 = interpreters.get_main()
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, "send")
|
|
# Test for a channel that has one end associated to an interpreter.
|
|
send_interps = interpreters.channel_list_interpreters(cid, send=True)
|
|
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
|
|
self.assertEqual(send_interps, [interp0])
|
|
self.assertEqual(recv_interps, [])
|
|
|
|
interp1 = interpreters.create()
|
|
_run_output(interp1, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
obj = _interpreters.channel_recv({cid})
|
|
"""))
|
|
# Test for channel that has boths ends associated to an interpreter.
|
|
send_interps = interpreters.channel_list_interpreters(cid, send=True)
|
|
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
|
|
self.assertEqual(send_interps, [interp0])
|
|
self.assertEqual(recv_interps, [interp1])
|
|
|
|
def test_channel_list_interpreters_multiple(self):
|
|
"""Test listing interpreters for a channel with many associations."""
|
|
interp0 = interpreters.get_main()
|
|
interp1 = interpreters.create()
|
|
interp2 = interpreters.create()
|
|
interp3 = interpreters.create()
|
|
cid = interpreters.channel_create()
|
|
|
|
interpreters.channel_send(cid, "send")
|
|
_run_output(interp1, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
_interpreters.channel_send({cid}, "send")
|
|
"""))
|
|
_run_output(interp2, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
obj = _interpreters.channel_recv({cid})
|
|
"""))
|
|
_run_output(interp3, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
obj = _interpreters.channel_recv({cid})
|
|
"""))
|
|
send_interps = interpreters.channel_list_interpreters(cid, send=True)
|
|
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
|
|
self.assertEqual(set(send_interps), {interp0, interp1})
|
|
self.assertEqual(set(recv_interps), {interp2, interp3})
|
|
|
|
def test_channel_list_interpreters_destroyed(self):
|
|
"""Test listing channel interpreters with a destroyed interpreter."""
|
|
interp0 = interpreters.get_main()
|
|
interp1 = interpreters.create()
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, "send")
|
|
_run_output(interp1, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
obj = _interpreters.channel_recv({cid})
|
|
"""))
|
|
# Should be one interpreter associated with each end.
|
|
send_interps = interpreters.channel_list_interpreters(cid, send=True)
|
|
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
|
|
self.assertEqual(send_interps, [interp0])
|
|
self.assertEqual(recv_interps, [interp1])
|
|
|
|
interpreters.destroy(interp1)
|
|
# Destroyed interpreter should not be listed.
|
|
send_interps = interpreters.channel_list_interpreters(cid, send=True)
|
|
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
|
|
self.assertEqual(send_interps, [interp0])
|
|
self.assertEqual(recv_interps, [])
|
|
|
|
def test_channel_list_interpreters_released(self):
|
|
"""Test listing channel interpreters with a released channel."""
|
|
# Set up one channel with main interpreter on the send end and two
|
|
# subinterpreters on the receive end.
|
|
interp0 = interpreters.get_main()
|
|
interp1 = interpreters.create()
|
|
interp2 = interpreters.create()
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, "data")
|
|
_run_output(interp1, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
obj = _interpreters.channel_recv({cid})
|
|
"""))
|
|
interpreters.channel_send(cid, "data")
|
|
_run_output(interp2, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
obj = _interpreters.channel_recv({cid})
|
|
"""))
|
|
# Check the setup.
|
|
send_interps = interpreters.channel_list_interpreters(cid, send=True)
|
|
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
|
|
self.assertEqual(len(send_interps), 1)
|
|
self.assertEqual(len(recv_interps), 2)
|
|
|
|
# Release the main interpreter from the send end.
|
|
interpreters.channel_release(cid, send=True)
|
|
# Send end should have no associated interpreters.
|
|
send_interps = interpreters.channel_list_interpreters(cid, send=True)
|
|
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
|
|
self.assertEqual(len(send_interps), 0)
|
|
self.assertEqual(len(recv_interps), 2)
|
|
|
|
# Release one of the subinterpreters from the receive end.
|
|
_run_output(interp2, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
_interpreters.channel_release({cid})
|
|
"""))
|
|
# Receive end should have the released interpreter removed.
|
|
send_interps = interpreters.channel_list_interpreters(cid, send=True)
|
|
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
|
|
self.assertEqual(len(send_interps), 0)
|
|
self.assertEqual(recv_interps, [interp1])
|
|
|
|
def test_channel_list_interpreters_closed(self):
|
|
"""Test listing channel interpreters with a closed channel."""
|
|
interp0 = interpreters.get_main()
|
|
interp1 = interpreters.create()
|
|
cid = interpreters.channel_create()
|
|
# Put something in the channel so that it's not empty.
|
|
interpreters.channel_send(cid, "send")
|
|
|
|
# Check initial state.
|
|
send_interps = interpreters.channel_list_interpreters(cid, send=True)
|
|
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
|
|
self.assertEqual(len(send_interps), 1)
|
|
self.assertEqual(len(recv_interps), 0)
|
|
|
|
# Force close the channel.
|
|
interpreters.channel_close(cid, force=True)
|
|
# Both ends should raise an error.
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_list_interpreters(cid, send=True)
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_list_interpreters(cid, send=False)
|
|
|
|
def test_channel_list_interpreters_closed_send_end(self):
|
|
"""Test listing channel interpreters with a channel's send end closed."""
|
|
interp0 = interpreters.get_main()
|
|
interp1 = interpreters.create()
|
|
cid = interpreters.channel_create()
|
|
# Put something in the channel so that it's not empty.
|
|
interpreters.channel_send(cid, "send")
|
|
|
|
# Check initial state.
|
|
send_interps = interpreters.channel_list_interpreters(cid, send=True)
|
|
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
|
|
self.assertEqual(len(send_interps), 1)
|
|
self.assertEqual(len(recv_interps), 0)
|
|
|
|
# Close the send end of the channel.
|
|
interpreters.channel_close(cid, send=True)
|
|
# Send end should raise an error.
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_list_interpreters(cid, send=True)
|
|
# Receive end should not be closed (since channel is not empty).
|
|
recv_interps = interpreters.channel_list_interpreters(cid, send=False)
|
|
self.assertEqual(len(recv_interps), 0)
|
|
|
|
# Close the receive end of the channel from a subinterpreter.
|
|
_run_output(interp1, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
_interpreters.channel_close({cid}, force=True)
|
|
"""))
|
|
# Both ends should raise an error.
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_list_interpreters(cid, send=True)
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_list_interpreters(cid, send=False)
|
|
|
|
####################
|
|
|
|
def test_send_recv_main(self):
|
|
cid = interpreters.channel_create()
|
|
orig = b'spam'
|
|
interpreters.channel_send(cid, orig)
|
|
obj = interpreters.channel_recv(cid)
|
|
|
|
self.assertEqual(obj, orig)
|
|
self.assertIsNot(obj, orig)
|
|
|
|
def test_send_recv_same_interpreter(self):
|
|
id1 = interpreters.create()
|
|
out = _run_output(id1, dedent("""
|
|
import _xxsubinterpreters as _interpreters
|
|
cid = _interpreters.channel_create()
|
|
orig = b'spam'
|
|
_interpreters.channel_send(cid, orig)
|
|
obj = _interpreters.channel_recv(cid)
|
|
assert obj is not orig
|
|
assert obj == orig
|
|
"""))
|
|
|
|
def test_send_recv_different_interpreters(self):
|
|
cid = interpreters.channel_create()
|
|
id1 = interpreters.create()
|
|
out = _run_output(id1, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
_interpreters.channel_send({cid}, b'spam')
|
|
"""))
|
|
obj = interpreters.channel_recv(cid)
|
|
|
|
self.assertEqual(obj, b'spam')
|
|
|
|
def test_send_recv_different_threads(self):
|
|
cid = interpreters.channel_create()
|
|
|
|
def f():
|
|
while True:
|
|
try:
|
|
obj = interpreters.channel_recv(cid)
|
|
break
|
|
except interpreters.ChannelEmptyError:
|
|
time.sleep(0.1)
|
|
interpreters.channel_send(cid, obj)
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
|
|
interpreters.channel_send(cid, b'spam')
|
|
t.join()
|
|
obj = interpreters.channel_recv(cid)
|
|
|
|
self.assertEqual(obj, b'spam')
|
|
|
|
def test_send_recv_different_interpreters_and_threads(self):
|
|
cid = interpreters.channel_create()
|
|
id1 = interpreters.create()
|
|
out = None
|
|
|
|
def f():
|
|
nonlocal out
|
|
out = _run_output(id1, dedent(f"""
|
|
import time
|
|
import _xxsubinterpreters as _interpreters
|
|
while True:
|
|
try:
|
|
obj = _interpreters.channel_recv({cid})
|
|
break
|
|
except _interpreters.ChannelEmptyError:
|
|
time.sleep(0.1)
|
|
assert(obj == b'spam')
|
|
_interpreters.channel_send({cid}, b'eggs')
|
|
"""))
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
|
|
interpreters.channel_send(cid, b'spam')
|
|
t.join()
|
|
obj = interpreters.channel_recv(cid)
|
|
|
|
self.assertEqual(obj, b'eggs')
|
|
|
|
def test_send_not_found(self):
|
|
with self.assertRaises(interpreters.ChannelNotFoundError):
|
|
interpreters.channel_send(10, b'spam')
|
|
|
|
def test_recv_not_found(self):
|
|
with self.assertRaises(interpreters.ChannelNotFoundError):
|
|
interpreters.channel_recv(10)
|
|
|
|
def test_recv_empty(self):
|
|
cid = interpreters.channel_create()
|
|
with self.assertRaises(interpreters.ChannelEmptyError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
def test_recv_default(self):
|
|
default = object()
|
|
cid = interpreters.channel_create()
|
|
obj1 = interpreters.channel_recv(cid, default)
|
|
interpreters.channel_send(cid, None)
|
|
interpreters.channel_send(cid, 1)
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_send(cid, b'eggs')
|
|
obj2 = interpreters.channel_recv(cid, default)
|
|
obj3 = interpreters.channel_recv(cid, default)
|
|
obj4 = interpreters.channel_recv(cid)
|
|
obj5 = interpreters.channel_recv(cid, default)
|
|
obj6 = interpreters.channel_recv(cid, default)
|
|
|
|
self.assertIs(obj1, default)
|
|
self.assertIs(obj2, None)
|
|
self.assertEqual(obj3, 1)
|
|
self.assertEqual(obj4, b'spam')
|
|
self.assertEqual(obj5, b'eggs')
|
|
self.assertIs(obj6, default)
|
|
|
|
def test_run_string_arg_unresolved(self):
|
|
cid = interpreters.channel_create()
|
|
interp = interpreters.create()
|
|
|
|
out = _run_output(interp, dedent("""
|
|
import _xxsubinterpreters as _interpreters
|
|
print(cid.end)
|
|
_interpreters.channel_send(cid, b'spam')
|
|
"""),
|
|
dict(cid=cid.send))
|
|
obj = interpreters.channel_recv(cid)
|
|
|
|
self.assertEqual(obj, b'spam')
|
|
self.assertEqual(out.strip(), 'send')
|
|
|
|
# XXX For now there is no high-level channel into which the
|
|
# sent channel ID can be converted...
|
|
# Note: this test caused crashes on some buildbots (bpo-33615).
|
|
@unittest.skip('disabled until high-level channels exist')
|
|
def test_run_string_arg_resolved(self):
|
|
cid = interpreters.channel_create()
|
|
cid = interpreters._channel_id(cid, _resolve=True)
|
|
interp = interpreters.create()
|
|
|
|
out = _run_output(interp, dedent("""
|
|
import _xxsubinterpreters as _interpreters
|
|
print(chan.id.end)
|
|
_interpreters.channel_send(chan.id, b'spam')
|
|
"""),
|
|
dict(chan=cid.send))
|
|
obj = interpreters.channel_recv(cid)
|
|
|
|
self.assertEqual(obj, b'spam')
|
|
self.assertEqual(out.strip(), 'send')
|
|
|
|
# close
|
|
|
|
def test_close_single_user(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_close(cid)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_send(cid, b'eggs')
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
def test_close_multiple_users(self):
|
|
cid = interpreters.channel_create()
|
|
id1 = interpreters.create()
|
|
id2 = interpreters.create()
|
|
interpreters.run_string(id1, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
_interpreters.channel_send({cid}, b'spam')
|
|
"""))
|
|
interpreters.run_string(id2, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
_interpreters.channel_recv({cid})
|
|
"""))
|
|
interpreters.channel_close(cid)
|
|
with self.assertRaises(interpreters.RunFailedError) as cm:
|
|
interpreters.run_string(id1, dedent(f"""
|
|
_interpreters.channel_send({cid}, b'spam')
|
|
"""))
|
|
self.assertIn('ChannelClosedError', str(cm.exception))
|
|
with self.assertRaises(interpreters.RunFailedError) as cm:
|
|
interpreters.run_string(id2, dedent(f"""
|
|
_interpreters.channel_send({cid}, b'spam')
|
|
"""))
|
|
self.assertIn('ChannelClosedError', str(cm.exception))
|
|
|
|
def test_close_multiple_times(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_close(cid)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_close(cid)
|
|
|
|
def test_close_empty(self):
|
|
tests = [
|
|
(False, False),
|
|
(True, False),
|
|
(False, True),
|
|
(True, True),
|
|
]
|
|
for send, recv in tests:
|
|
with self.subTest((send, recv)):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_close(cid, send=send, recv=recv)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_send(cid, b'eggs')
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
def test_close_defaults_with_unused_items(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_send(cid, b'ham')
|
|
|
|
with self.assertRaises(interpreters.ChannelNotEmptyError):
|
|
interpreters.channel_close(cid)
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_send(cid, b'eggs')
|
|
|
|
def test_close_recv_with_unused_items_unforced(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_send(cid, b'ham')
|
|
|
|
with self.assertRaises(interpreters.ChannelNotEmptyError):
|
|
interpreters.channel_close(cid, recv=True)
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_send(cid, b'eggs')
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_close(cid, recv=True)
|
|
|
|
def test_close_send_with_unused_items_unforced(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_send(cid, b'ham')
|
|
interpreters.channel_close(cid, send=True)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_send(cid, b'eggs')
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_recv(cid)
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
def test_close_both_with_unused_items_unforced(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_send(cid, b'ham')
|
|
|
|
with self.assertRaises(interpreters.ChannelNotEmptyError):
|
|
interpreters.channel_close(cid, recv=True, send=True)
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_send(cid, b'eggs')
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_close(cid, recv=True)
|
|
|
|
def test_close_recv_with_unused_items_forced(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_send(cid, b'ham')
|
|
interpreters.channel_close(cid, recv=True, force=True)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_send(cid, b'eggs')
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
def test_close_send_with_unused_items_forced(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_send(cid, b'ham')
|
|
interpreters.channel_close(cid, send=True, force=True)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_send(cid, b'eggs')
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
def test_close_both_with_unused_items_forced(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_send(cid, b'ham')
|
|
interpreters.channel_close(cid, send=True, recv=True, force=True)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_send(cid, b'eggs')
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
def test_close_never_used(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_close(cid)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_send(cid, b'spam')
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
def test_close_by_unassociated_interp(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interp = interpreters.create()
|
|
interpreters.run_string(interp, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
_interpreters.channel_close({cid}, force=True)
|
|
"""))
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_close(cid)
|
|
|
|
def test_close_used_multiple_times_by_single_user(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_close(cid, force=True)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_send(cid, b'eggs')
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
def test_channel_list_interpreters_invalid_channel(self):
|
|
cid = interpreters.channel_create()
|
|
# Test for invalid channel ID.
|
|
with self.assertRaises(interpreters.ChannelNotFoundError):
|
|
interpreters.channel_list_interpreters(1000, send=True)
|
|
|
|
interpreters.channel_close(cid)
|
|
# Test for a channel that has been closed.
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_list_interpreters(cid, send=True)
|
|
|
|
def test_channel_list_interpreters_invalid_args(self):
|
|
# Tests for invalid arguments passed to the API.
|
|
cid = interpreters.channel_create()
|
|
with self.assertRaises(TypeError):
|
|
interpreters.channel_list_interpreters(cid)
|
|
|
|
|
|
class ChannelReleaseTests(TestBase):
|
|
|
|
# XXX Add more test coverage a la the tests for close().
|
|
|
|
"""
|
|
- main / interp / other
|
|
- run in: current thread / new thread / other thread / different threads
|
|
- end / opposite
|
|
- force / no force
|
|
- used / not used (associated / not associated)
|
|
- empty / emptied / never emptied / partly emptied
|
|
- closed / not closed
|
|
- released / not released
|
|
- creator (interp) / other
|
|
- associated interpreter not running
|
|
- associated interpreter destroyed
|
|
"""
|
|
|
|
"""
|
|
use
|
|
pre-release
|
|
release
|
|
after
|
|
check
|
|
"""
|
|
|
|
"""
|
|
release in: main, interp1
|
|
creator: same, other (incl. interp2)
|
|
|
|
use: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
|
|
pre-release: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
|
|
pre-release forced: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
|
|
|
|
release: same
|
|
release forced: same
|
|
|
|
use after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
|
|
release after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
|
|
check released: send/recv for same/other(incl. interp2)
|
|
check closed: send/recv for same/other(incl. interp2)
|
|
"""
|
|
|
|
def test_single_user(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_release(cid, send=True, recv=True)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_send(cid, b'eggs')
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
def test_multiple_users(self):
|
|
cid = interpreters.channel_create()
|
|
id1 = interpreters.create()
|
|
id2 = interpreters.create()
|
|
interpreters.run_string(id1, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
_interpreters.channel_send({cid}, b'spam')
|
|
"""))
|
|
out = _run_output(id2, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
obj = _interpreters.channel_recv({cid})
|
|
_interpreters.channel_release({cid})
|
|
print(repr(obj))
|
|
"""))
|
|
interpreters.run_string(id1, dedent(f"""
|
|
_interpreters.channel_release({cid})
|
|
"""))
|
|
|
|
self.assertEqual(out.strip(), "b'spam'")
|
|
|
|
def test_no_kwargs(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_release(cid)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_send(cid, b'eggs')
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
def test_multiple_times(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_release(cid, send=True, recv=True)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_release(cid, send=True, recv=True)
|
|
|
|
def test_with_unused_items(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_send(cid, b'ham')
|
|
interpreters.channel_release(cid, send=True, recv=True)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
def test_never_used(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_release(cid)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_send(cid, b'spam')
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
def test_by_unassociated_interp(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interp = interpreters.create()
|
|
interpreters.run_string(interp, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
_interpreters.channel_release({cid})
|
|
"""))
|
|
obj = interpreters.channel_recv(cid)
|
|
interpreters.channel_release(cid)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_send(cid, b'eggs')
|
|
self.assertEqual(obj, b'spam')
|
|
|
|
def test_close_if_unassociated(self):
|
|
# XXX Something's not right with this test...
|
|
cid = interpreters.channel_create()
|
|
interp = interpreters.create()
|
|
interpreters.run_string(interp, dedent(f"""
|
|
import _xxsubinterpreters as _interpreters
|
|
obj = _interpreters.channel_send({cid}, b'spam')
|
|
_interpreters.channel_release({cid})
|
|
"""))
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
def test_partially(self):
|
|
# XXX Is partial close too weird/confusing?
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, None)
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_release(cid, send=True)
|
|
obj = interpreters.channel_recv(cid)
|
|
|
|
self.assertEqual(obj, b'spam')
|
|
|
|
def test_used_multiple_times_by_single_user(self):
|
|
cid = interpreters.channel_create()
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_send(cid, b'spam')
|
|
interpreters.channel_recv(cid)
|
|
interpreters.channel_release(cid, send=True, recv=True)
|
|
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_send(cid, b'eggs')
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(cid)
|
|
|
|
|
|
class ChannelCloseFixture(namedtuple('ChannelCloseFixture',
|
|
'end interp other extra creator')):
|
|
|
|
# Set this to True to avoid creating interpreters, e.g. when
|
|
# scanning through test permutations without running them.
|
|
QUICK = False
|
|
|
|
def __new__(cls, end, interp, other, extra, creator):
|
|
assert end in ('send', 'recv')
|
|
if cls.QUICK:
|
|
known = {}
|
|
else:
|
|
interp = Interpreter.from_raw(interp)
|
|
other = Interpreter.from_raw(other)
|
|
extra = Interpreter.from_raw(extra)
|
|
known = {
|
|
interp.name: interp,
|
|
other.name: other,
|
|
extra.name: extra,
|
|
}
|
|
if not creator:
|
|
creator = 'same'
|
|
self = super().__new__(cls, end, interp, other, extra, creator)
|
|
self._prepped = set()
|
|
self._state = ChannelState()
|
|
self._known = known
|
|
return self
|
|
|
|
@property
|
|
def state(self):
|
|
return self._state
|
|
|
|
@property
|
|
def cid(self):
|
|
try:
|
|
return self._cid
|
|
except AttributeError:
|
|
creator = self._get_interpreter(self.creator)
|
|
self._cid = self._new_channel(creator)
|
|
return self._cid
|
|
|
|
def get_interpreter(self, interp):
|
|
interp = self._get_interpreter(interp)
|
|
self._prep_interpreter(interp)
|
|
return interp
|
|
|
|
def expect_closed_error(self, end=None):
|
|
if end is None:
|
|
end = self.end
|
|
if end == 'recv' and self.state.closed == 'send':
|
|
return False
|
|
return bool(self.state.closed)
|
|
|
|
def prep_interpreter(self, interp):
|
|
self._prep_interpreter(interp)
|
|
|
|
def record_action(self, action, result):
|
|
self._state = result
|
|
|
|
def clean_up(self):
|
|
clean_up_interpreters()
|
|
clean_up_channels()
|
|
|
|
# internal methods
|
|
|
|
def _new_channel(self, creator):
|
|
if creator.name == 'main':
|
|
return interpreters.channel_create()
|
|
else:
|
|
ch = interpreters.channel_create()
|
|
run_interp(creator.id, f"""
|
|
import _xxsubinterpreters
|
|
cid = _xxsubinterpreters.channel_create()
|
|
# We purposefully send back an int to avoid tying the
|
|
# channel to the other interpreter.
|
|
_xxsubinterpreters.channel_send({ch}, int(cid))
|
|
del _xxsubinterpreters
|
|
""")
|
|
self._cid = interpreters.channel_recv(ch)
|
|
return self._cid
|
|
|
|
def _get_interpreter(self, interp):
|
|
if interp in ('same', 'interp'):
|
|
return self.interp
|
|
elif interp == 'other':
|
|
return self.other
|
|
elif interp == 'extra':
|
|
return self.extra
|
|
else:
|
|
name = interp
|
|
try:
|
|
interp = self._known[name]
|
|
except KeyError:
|
|
interp = self._known[name] = Interpreter(name)
|
|
return interp
|
|
|
|
def _prep_interpreter(self, interp):
|
|
if interp.id in self._prepped:
|
|
return
|
|
self._prepped.add(interp.id)
|
|
if interp.name == 'main':
|
|
return
|
|
run_interp(interp.id, f"""
|
|
import _xxsubinterpreters as interpreters
|
|
import test.test__xxsubinterpreters as helpers
|
|
ChannelState = helpers.ChannelState
|
|
try:
|
|
cid
|
|
except NameError:
|
|
cid = interpreters._channel_id({self.cid})
|
|
""")
|
|
|
|
|
|
@unittest.skip('these tests take several hours to run')
|
|
class ExhaustiveChannelTests(TestBase):
|
|
|
|
"""
|
|
- main / interp / other
|
|
- run in: current thread / new thread / other thread / different threads
|
|
- end / opposite
|
|
- force / no force
|
|
- used / not used (associated / not associated)
|
|
- empty / emptied / never emptied / partly emptied
|
|
- closed / not closed
|
|
- released / not released
|
|
- creator (interp) / other
|
|
- associated interpreter not running
|
|
- associated interpreter destroyed
|
|
|
|
- close after unbound
|
|
"""
|
|
|
|
"""
|
|
use
|
|
pre-close
|
|
close
|
|
after
|
|
check
|
|
"""
|
|
|
|
"""
|
|
close in: main, interp1
|
|
creator: same, other, extra
|
|
|
|
use: None,send,recv,send/recv in None,same,other,same+other,all
|
|
pre-close: None,send,recv in None,same,other,same+other,all
|
|
pre-close forced: None,send,recv in None,same,other,same+other,all
|
|
|
|
close: same
|
|
close forced: same
|
|
|
|
use after: None,send,recv,send/recv in None,same,other,extra,same+other,all
|
|
close after: None,send,recv,send/recv in None,same,other,extra,same+other,all
|
|
check closed: send/recv for same/other(incl. interp2)
|
|
"""
|
|
|
|
def iter_action_sets(self):
|
|
# - used / not used (associated / not associated)
|
|
# - empty / emptied / never emptied / partly emptied
|
|
# - closed / not closed
|
|
# - released / not released
|
|
|
|
# never used
|
|
yield []
|
|
|
|
# only pre-closed (and possible used after)
|
|
for closeactions in self._iter_close_action_sets('same', 'other'):
|
|
yield closeactions
|
|
for postactions in self._iter_post_close_action_sets():
|
|
yield closeactions + postactions
|
|
for closeactions in self._iter_close_action_sets('other', 'extra'):
|
|
yield closeactions
|
|
for postactions in self._iter_post_close_action_sets():
|
|
yield closeactions + postactions
|
|
|
|
# used
|
|
for useactions in self._iter_use_action_sets('same', 'other'):
|
|
yield useactions
|
|
for closeactions in self._iter_close_action_sets('same', 'other'):
|
|
actions = useactions + closeactions
|
|
yield actions
|
|
for postactions in self._iter_post_close_action_sets():
|
|
yield actions + postactions
|
|
for closeactions in self._iter_close_action_sets('other', 'extra'):
|
|
actions = useactions + closeactions
|
|
yield actions
|
|
for postactions in self._iter_post_close_action_sets():
|
|
yield actions + postactions
|
|
for useactions in self._iter_use_action_sets('other', 'extra'):
|
|
yield useactions
|
|
for closeactions in self._iter_close_action_sets('same', 'other'):
|
|
actions = useactions + closeactions
|
|
yield actions
|
|
for postactions in self._iter_post_close_action_sets():
|
|
yield actions + postactions
|
|
for closeactions in self._iter_close_action_sets('other', 'extra'):
|
|
actions = useactions + closeactions
|
|
yield actions
|
|
for postactions in self._iter_post_close_action_sets():
|
|
yield actions + postactions
|
|
|
|
def _iter_use_action_sets(self, interp1, interp2):
|
|
interps = (interp1, interp2)
|
|
|
|
# only recv end used
|
|
yield [
|
|
ChannelAction('use', 'recv', interp1),
|
|
]
|
|
yield [
|
|
ChannelAction('use', 'recv', interp2),
|
|
]
|
|
yield [
|
|
ChannelAction('use', 'recv', interp1),
|
|
ChannelAction('use', 'recv', interp2),
|
|
]
|
|
|
|
# never emptied
|
|
yield [
|
|
ChannelAction('use', 'send', interp1),
|
|
]
|
|
yield [
|
|
ChannelAction('use', 'send', interp2),
|
|
]
|
|
yield [
|
|
ChannelAction('use', 'send', interp1),
|
|
ChannelAction('use', 'send', interp2),
|
|
]
|
|
|
|
# partially emptied
|
|
for interp1 in interps:
|
|
for interp2 in interps:
|
|
for interp3 in interps:
|
|
yield [
|
|
ChannelAction('use', 'send', interp1),
|
|
ChannelAction('use', 'send', interp2),
|
|
ChannelAction('use', 'recv', interp3),
|
|
]
|
|
|
|
# fully emptied
|
|
for interp1 in interps:
|
|
for interp2 in interps:
|
|
for interp3 in interps:
|
|
for interp4 in interps:
|
|
yield [
|
|
ChannelAction('use', 'send', interp1),
|
|
ChannelAction('use', 'send', interp2),
|
|
ChannelAction('use', 'recv', interp3),
|
|
ChannelAction('use', 'recv', interp4),
|
|
]
|
|
|
|
def _iter_close_action_sets(self, interp1, interp2):
|
|
ends = ('recv', 'send')
|
|
interps = (interp1, interp2)
|
|
for force in (True, False):
|
|
op = 'force-close' if force else 'close'
|
|
for interp in interps:
|
|
for end in ends:
|
|
yield [
|
|
ChannelAction(op, end, interp),
|
|
]
|
|
for recvop in ('close', 'force-close'):
|
|
for sendop in ('close', 'force-close'):
|
|
for recv in interps:
|
|
for send in interps:
|
|
yield [
|
|
ChannelAction(recvop, 'recv', recv),
|
|
ChannelAction(sendop, 'send', send),
|
|
]
|
|
|
|
def _iter_post_close_action_sets(self):
|
|
for interp in ('same', 'extra', 'other'):
|
|
yield [
|
|
ChannelAction('use', 'recv', interp),
|
|
]
|
|
yield [
|
|
ChannelAction('use', 'send', interp),
|
|
]
|
|
|
|
def run_actions(self, fix, actions):
|
|
for action in actions:
|
|
self.run_action(fix, action)
|
|
|
|
def run_action(self, fix, action, *, hideclosed=True):
|
|
end = action.resolve_end(fix.end)
|
|
interp = action.resolve_interp(fix.interp, fix.other, fix.extra)
|
|
fix.prep_interpreter(interp)
|
|
if interp.name == 'main':
|
|
result = run_action(
|
|
fix.cid,
|
|
action.action,
|
|
end,
|
|
fix.state,
|
|
hideclosed=hideclosed,
|
|
)
|
|
fix.record_action(action, result)
|
|
else:
|
|
_cid = interpreters.channel_create()
|
|
run_interp(interp.id, f"""
|
|
result = helpers.run_action(
|
|
{fix.cid},
|
|
{repr(action.action)},
|
|
{repr(end)},
|
|
{repr(fix.state)},
|
|
hideclosed={hideclosed},
|
|
)
|
|
interpreters.channel_send({_cid}, result.pending.to_bytes(1, 'little'))
|
|
interpreters.channel_send({_cid}, b'X' if result.closed else b'')
|
|
""")
|
|
result = ChannelState(
|
|
pending=int.from_bytes(interpreters.channel_recv(_cid), 'little'),
|
|
closed=bool(interpreters.channel_recv(_cid)),
|
|
)
|
|
fix.record_action(action, result)
|
|
|
|
def iter_fixtures(self):
|
|
# XXX threads?
|
|
interpreters = [
|
|
('main', 'interp', 'extra'),
|
|
('interp', 'main', 'extra'),
|
|
('interp1', 'interp2', 'extra'),
|
|
('interp1', 'interp2', 'main'),
|
|
]
|
|
for interp, other, extra in interpreters:
|
|
for creator in ('same', 'other', 'creator'):
|
|
for end in ('send', 'recv'):
|
|
yield ChannelCloseFixture(end, interp, other, extra, creator)
|
|
|
|
def _close(self, fix, *, force):
|
|
op = 'force-close' if force else 'close'
|
|
close = ChannelAction(op, fix.end, 'same')
|
|
if not fix.expect_closed_error():
|
|
self.run_action(fix, close, hideclosed=False)
|
|
else:
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
self.run_action(fix, close, hideclosed=False)
|
|
|
|
def _assert_closed_in_interp(self, fix, interp=None):
|
|
if interp is None or interp.name == 'main':
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_recv(fix.cid)
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_send(fix.cid, b'spam')
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_close(fix.cid)
|
|
with self.assertRaises(interpreters.ChannelClosedError):
|
|
interpreters.channel_close(fix.cid, force=True)
|
|
else:
|
|
run_interp(interp.id, f"""
|
|
with helpers.expect_channel_closed():
|
|
interpreters.channel_recv(cid)
|
|
""")
|
|
run_interp(interp.id, f"""
|
|
with helpers.expect_channel_closed():
|
|
interpreters.channel_send(cid, b'spam')
|
|
""")
|
|
run_interp(interp.id, f"""
|
|
with helpers.expect_channel_closed():
|
|
interpreters.channel_close(cid)
|
|
""")
|
|
run_interp(interp.id, f"""
|
|
with helpers.expect_channel_closed():
|
|
interpreters.channel_close(cid, force=True)
|
|
""")
|
|
|
|
def _assert_closed(self, fix):
|
|
self.assertTrue(fix.state.closed)
|
|
|
|
for _ in range(fix.state.pending):
|
|
interpreters.channel_recv(fix.cid)
|
|
self._assert_closed_in_interp(fix)
|
|
|
|
for interp in ('same', 'other'):
|
|
interp = fix.get_interpreter(interp)
|
|
if interp.name == 'main':
|
|
continue
|
|
self._assert_closed_in_interp(fix, interp)
|
|
|
|
interp = fix.get_interpreter('fresh')
|
|
self._assert_closed_in_interp(fix, interp)
|
|
|
|
def _iter_close_tests(self, verbose=False):
|
|
i = 0
|
|
for actions in self.iter_action_sets():
|
|
print()
|
|
for fix in self.iter_fixtures():
|
|
i += 1
|
|
if i > 1000:
|
|
return
|
|
if verbose:
|
|
if (i - 1) % 6 == 0:
|
|
print()
|
|
print(i, fix, '({} actions)'.format(len(actions)))
|
|
else:
|
|
if (i - 1) % 6 == 0:
|
|
print(' ', end='')
|
|
print('.', end=''); sys.stdout.flush()
|
|
yield i, fix, actions
|
|
if verbose:
|
|
print('---')
|
|
print()
|
|
|
|
# This is useful for scanning through the possible tests.
|
|
def _skim_close_tests(self):
|
|
ChannelCloseFixture.QUICK = True
|
|
for i, fix, actions in self._iter_close_tests():
|
|
pass
|
|
|
|
def test_close(self):
|
|
for i, fix, actions in self._iter_close_tests():
|
|
with self.subTest('{} {} {}'.format(i, fix, actions)):
|
|
fix.prep_interpreter(fix.interp)
|
|
self.run_actions(fix, actions)
|
|
|
|
self._close(fix, force=False)
|
|
|
|
self._assert_closed(fix)
|
|
# XXX Things slow down if we have too many interpreters.
|
|
fix.clean_up()
|
|
|
|
def test_force_close(self):
|
|
for i, fix, actions in self._iter_close_tests():
|
|
with self.subTest('{} {} {}'.format(i, fix, actions)):
|
|
fix.prep_interpreter(fix.interp)
|
|
self.run_actions(fix, actions)
|
|
|
|
self._close(fix, force=True)
|
|
|
|
self._assert_closed(fix)
|
|
# XXX Things slow down if we have too many interpreters.
|
|
fix.clean_up()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|