Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infect asyncio #121

Merged
merged 53 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
acd63d0
First draft "infected `asyncio` mode"
goodboy Jun 29, 2020
1825b21
Wow, fix all the broken async func invoking code..
goodboy Jul 3, 2020
055788c
Attempt to make mypy happy..
goodboy Jul 21, 2020
1406ddc
Add `infect_asyncio: bool` flag to nursery methods
goodboy Jul 26, 2020
8070b16
Support asyncio actors with the trio spawner backend
goodboy Jul 27, 2020
2cf8714
Log any asyncio error
goodboy Sep 12, 2020
80f47de
Raise from asyncio error; fixes mypy
goodboy Oct 14, 2020
509ae13
Raise any asyncio errors if in trio task on cancel
goodboy Dec 10, 2020
340effa
Add initial infected asyncio error propagation test
goodboy Dec 10, 2020
d80f8d7
WIP redo asyncio async gen streaming
goodboy Apr 27, 2021
793bcfb
Pass `infect_asyncio` flag to mp actors as well
goodboy May 10, 2021
aa24bbc
Proxy asyncio cancelleds as well
goodboy May 12, 2021
55e210f
Drop bad .close() call
goodboy Jun 2, 2021
325c0cd
Fix error propagation on asyncio streaming tasks
goodboy Jul 28, 2021
d9dac3f
Drop old implementation cruft
goodboy Aug 2, 2021
c262b1a
Always cancel the asyncio task?
goodboy Sep 18, 2021
b376b7c
First draft: `.to_asyncio.open_channel_from()`
goodboy Oct 8, 2021
7a65165
Facepalm, re-raise captured `asyncio` task error
goodboy Oct 14, 2021
41eddff
Drop old (and deluded) "streaming" cruft
goodboy Oct 14, 2021
299e419
Plan asyncio test set
goodboy Nov 5, 2021
446feff
Clean type imports
goodboy Nov 5, 2021
06fa650
Drop runtime logging for asyncio mode
goodboy Nov 7, 2021
0ab5e5c
Fill out nursery docstring
goodboy Nov 7, 2021
5635724
Add a `Portal.cancel_actor()` test
goodboy Nov 7, 2021
1114b69
Adjust linked-loop-task tear down sequence
goodboy Nov 17, 2021
04c0eda
Add an `asyncio`-internal cancel test
goodboy Nov 17, 2021
8704664
Reverse the order for asyncio cancelleds? I dunno why
goodboy Nov 17, 2021
c19123b
Add trio-cancels-anursery-cancels-aio test
goodboy Nov 18, 2021
e815f76
Add a cancelled-from-remote-trio-task case
goodboy Nov 18, 2021
e6687bc
Serious-ify doc string
goodboy Nov 19, 2021
9bc94b5
Factor error translation into a ctx mngr
goodboy Nov 20, 2021
d27ddb7
Add a basic `open_channel_from()` streaming test
goodboy Nov 20, 2021
44d0e9f
Add a `LinkedTaskChannel` for synced inter-loop-streaming
goodboy Nov 22, 2021
ad2567d
Add first set of interloop streaming tests
goodboy Nov 22, 2021
c48c68c
Flip doc strings to my preferred format
goodboy Nov 22, 2021
5f40946
Re-wrap and raise `asyncio.CancelledError`
goodboy Nov 23, 2021
6803891
Collect `asyncio` task exceptions to avoid warning msg
goodboy Nov 24, 2021
c4b3bb3
Port tests to handle our new `asyncio` cancelled type
goodboy Nov 24, 2021
b69412a
Drop cancel scope from linked task channel
goodboy Nov 25, 2021
2b9b29e
Add an asyncio echo server test
goodboy Nov 25, 2021
9a2de90
Add mid stream echoserver "bail" cases
goodboy Nov 28, 2021
56cc983
Return channel type from `_run_asyncio_task()`
goodboy Nov 28, 2021
24078f2
More doc string style tweaks
goodboy Dec 2, 2021
d65912e
Increase kbi delay in remote cancel test
goodboy Dec 2, 2021
b463841
Add infected `asyncio` echo server example
goodboy Dec 10, 2021
7237d69
Add asyncio echo server ex to readme; fix cluster section
goodboy Dec 10, 2021
6952c7d
Add features bullet, slip in a guille-ism
goodboy Dec 10, 2021
1fdcaf3
Not enough time for new asyncio tests?
goodboy Dec 10, 2021
73d252e
Emphasize `asyncio` only with sleeps
goodboy Dec 11, 2021
4c0cfa6
Link to SC on wikipedia
goodboy Mar 11, 2021
4d1a48a
Link to inter-loop channel issue in readme
goodboy Dec 12, 2021
9b14d82
Add nooz
goodboy Dec 12, 2021
9b4cdb0
Add agpl header
goodboy Dec 15, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:

testing-linux:
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}'
timeout-minutes: 9
timeout-minutes: 10
runs-on: ${{ matrix.os }}

strategy:
Expand Down
121 changes: 119 additions & 2 deletions docs/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ Features
- A modular transport stack, allowing for custom serialization (eg.
`msgspec`_), communications protocols, and environment specific IPC
primitives
- `structured concurrency`_ from the ground up
- Support for spawning process-level-SC, inter-loop one-to-one-task oriented
``asyncio`` actors via "infected ``asyncio``" mode
- `structured chadcurrency`_ from the ground up


Run a func in a process
Expand Down Expand Up @@ -313,6 +315,117 @@ real time::
This uses no extra threads, fancy semaphores or futures; all we need
is ``tractor``'s IPC!

"Infected ``asyncio``" mode
---------------------------
Have a bunch of ``asyncio`` code you want to force to be SC at the process level?

Check out our experimental system for `guest-mode`_ controlled
``asyncio`` actors:

.. code:: python

import asyncio
from statistics import mean
import time

import trio
import tractor


async def aio_echo_server(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
) -> None:

# a first message must be sent **from** this ``asyncio``
# task or the ``trio`` side will never unblock from
# ``tractor.to_asyncio.open_channel_from():``
to_trio.send_nowait('start')

# XXX: this uses an ``from_trio: asyncio.Queue`` currently but we
# should probably offer something better.
while True:
# echo the msg back
to_trio.send_nowait(await from_trio.get())
await asyncio.sleep(0)


@tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context,
):
# this will block until the ``asyncio`` task sends a "first"
# message.
async with tractor.to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan):

assert first == 'start'
await ctx.started(first)

async with ctx.open_stream() as stream:

async for msg in stream:
await chan.send(msg)

out = await chan.receive()
# echo back to parent actor-task
await stream.send(out)


async def main():

async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_server',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
trio_to_aio_echo_server,
) as (ctx, first):

assert first == 'start'

count = 0
async with ctx.open_stream() as stream:

delays = []
send = time.time()

await stream.send(count)
async for msg in stream:
recv = time.time()
delays.append(recv - send)
assert msg == count
count += 1
send = time.time()
await stream.send(count)

if count >= 1e3:
break

print(f'mean round trip rate (Hz): {1/mean(delays)}')
await p.cancel_actor()


if __name__ == '__main__':
trio.run(main)


Yes, we spawn a python process, run ``asyncio``, start ``trio`` on the
``asyncio`` loop, then send commands to the ``trio`` scheduled tasks to
tell ``asyncio`` tasks what to do XD

We need help refining the `asyncio`-side channel API to be more
`trio`-like. Feel free to sling your opinion in `#273`_!


.. _#273: https://github.com/goodboy/tractor/issues/273


Higher level "cluster" APIs
---------------------------
To be extra terse the ``tractor`` devs have started hacking some "higher
level" APIs for managing actor trees/clusters. These interfaces should
generally be condsidered provisional for now but we encourage you to try
Expand Down Expand Up @@ -476,6 +589,7 @@ channel`_!
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
.. _trio gitter channel: https://gitter.im/python-trio/general
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org
Expand All @@ -484,11 +598,14 @@ channel`_!
.. _messages: https://en.wikipedia.org/wiki/Message_passing
.. _trio docs: https://trio.readthedocs.io/en/latest/
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _structured chadcurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
.. _async generators: https://www.python.org/dev/peps/pep-0525/
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel
.. _msgspec: https://jcristharif.com/msgspec/
.. _guest-mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops


.. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fgoodboy%2Ftractor%2Fbadge&style=popout-square
Expand Down
91 changes: 91 additions & 0 deletions examples/infected_asyncio_echo_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
'''
An SC compliant infected ``asyncio`` echo server.

'''
import asyncio
from statistics import mean
import time

import trio
import tractor


async def aio_echo_server(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
) -> None:

# a first message must be sent **from** this ``asyncio``
# task or the ``trio`` side will never unblock from
# ``tractor.to_asyncio.open_channel_from():``
to_trio.send_nowait('start')

# XXX: this uses an ``from_trio: asyncio.Queue`` currently but we
# should probably offer something better.
while True:
# echo the msg back
to_trio.send_nowait(await from_trio.get())
await asyncio.sleep(0)


@tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context,
):
# this will block until the ``asyncio`` task sends a "first"
# message.
async with tractor.to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan):

assert first == 'start'
await ctx.started(first)

async with ctx.open_stream() as stream:

async for msg in stream:
await chan.send(msg)

out = await chan.receive()
# echo back to parent actor-task
await stream.send(out)


async def main():

async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_server',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
trio_to_aio_echo_server,
) as (ctx, first):

assert first == 'start'

count = 0
async with ctx.open_stream() as stream:

delays = []
send = time.time()

await stream.send(count)
async for msg in stream:
recv = time.time()
delays.append(recv - send)
assert msg == count
count += 1
send = time.time()
await stream.send(count)

if count >= 1e3:
break

print(f'mean round trip rate (Hz): {1/mean(delays)}')
await p.cancel_actor()


if __name__ == '__main__':
trio.run(main)
28 changes: 28 additions & 0 deletions newsfragments/121.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
Add "infected ``asyncio`` mode; a sub-system to spawn and control
``asyncio`` actors using ``trio``'s guest-mode.

This gets us the following very interesting functionality:

- ability to spawn an actor that has a process entry point of
``asyncio.run()`` by passing ``infect_asyncio=True`` to
``Portal.start_actor()`` (and friends).
- the ``asyncio`` actor embeds ``trio`` using guest-mode and starts
a main ``trio`` task which runs the ``tractor.Actor._async_main()``
entry point engages all the normal ``tractor`` runtime IPC/messaging
machinery; for all purposes the actor is now running normally on
a ``trio.run()``.
- the actor can now make one-to-one task spawning requests to the
underlying ``asyncio`` event loop using either of:
* ``to_asyncio.run_task()`` to spawn and run an ``asyncio`` task to
completion and block until a return value is delivered.
* ``async with to_asyncio.open_channel_from():`` which spawns a task
and hands it a pair of "memory channels" to allow for bi-directional
streaming between the now SC-linked ``trio`` and ``asyncio`` tasks.

The output from any call(s) to ``asyncio`` can be handled as normal in
``trio``/``tractor`` task operation with the caveat of the overhead due
to guest-mode use.

For more details see the `original PR
<https://github.com/goodboy/tractor/pull/121>`_ and `issue
<https://github.com/goodboy/tractor/issues/120>`_.
2 changes: 1 addition & 1 deletion tests/test_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ def test_fast_graceful_cancel_when_spawn_task_in_soft_proc_wait_for_daemon(
cancellation, and it's faster, we might as well do it.

'''
kbi_delay = 0.2
kbi_delay = 0.5

async def main():
start = time.time()
Expand Down
5 changes: 3 additions & 2 deletions tests/test_docs_examples.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
'''
Let's make sure them docs work yah?
"""

'''
from contextlib import contextmanager
import itertools
import os
Expand Down
Loading