Skip to content

Commit

Permalink
Support asyncio actors with the trio spawner backend
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Aug 4, 2020
1 parent ba6bd3f commit 580ba8b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 4 deletions.
8 changes: 6 additions & 2 deletions tractor/_child.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ def parse_ipaddr(arg):
return (str(host), int(port))


from ._entry import _trio_main

if __name__ == "__main__":

parser = argparse.ArgumentParser()
parser.add_argument("--uid", type=parse_uid)
parser.add_argument("--loglevel", type=str)
parser.add_argument("--parent_addr", type=parse_ipaddr)
parser.add_argument("--asyncio", action='store_true')
args = parser.parse_args()

subactor = Actor(
Expand All @@ -36,5 +39,6 @@ def parse_ipaddr(arg):

_trio_main(
subactor,
parent_addr=args.parent_addr
)
parent_addr=args.parent_addr,
infect_asyncio=args.asyncio,
)
13 changes: 11 additions & 2 deletions tractor/_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def _mp_main(
forkserver_info: Tuple[Any, Any, Any, Any, Any],
start_method: str,
parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
"""The routine called *after fork* which invokes a fresh ``trio.run``
"""
Expand Down Expand Up @@ -58,13 +59,17 @@ def _mp_main(

def _trio_main(
actor: 'Actor',
parent_addr: Tuple[str, int] = None
*,
parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
"""Entry point for a `trio_run_in_process` subactor.
"""
# TODO: make a global func to set this or is it too hacky?
# os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.breakpoint'

log.info(f"Started new trio process for {actor.uid}")

if actor.loglevel is not None:
log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
Expand All @@ -82,7 +87,11 @@ def _trio_main(
)

try:
trio.run(trio_main)
if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt:
log.warning(f"Actor {actor.uid} received KBI")

Expand Down
8 changes: 8 additions & 0 deletions tractor/_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ async def cancel_on_completion(
async def spawn_subactor(
subactor: 'Actor',
parent_addr: Tuple[str, int],
infect_asyncio: bool,
):

spawn_cmd = [
Expand All @@ -176,6 +177,10 @@ async def spawn_subactor(
subactor.loglevel
]

# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")

proc = await trio.open_process(spawn_cmd)
yield proc

Expand All @@ -197,6 +202,7 @@ async def new_proc(
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
use_trio_run_in_process: bool = False,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
"""Create a new ``multiprocessing.Process`` using the
Expand All @@ -212,6 +218,7 @@ async def new_proc(
async with spawn_subactor(
subactor,
parent_addr,
infect_asyncio=infect_asyncio
) as proc:
log.info(f"Started {proc}")

Expand Down Expand Up @@ -296,6 +303,7 @@ async def new_proc(
fs_info,
start_method,
parent_addr,
infect_asyncio,
),
# daemon=True,
name=name,
Expand Down

0 comments on commit 580ba8b

Please sign in to comment.