Introspecting and extending Trio with trio.lowlevel
trio.lowlevel
contains low-level APIs for introspecting and extending Trio. If you’re writing ordinary, everyday code, then you can ignore this module completely. But sometimes you need something a bit lower level. Here are some examples of situations where you should reach for trio.lowlevel
:
You want to implement a new synchronization primitive that Trio doesn’t (yet) provide, like a reader-writer lock.
You want to extract low-level metrics to monitor the health of your application.
You want to use a low-level operating system interface that Trio doesn’t (yet) provide its own wrappers for, like watching a filesystem directory for changes.
You want to implement an interface for calling between Trio and another event loop within the same process.
You’re writing a debugger and want to visualize Trio’s task tree.
You need to interoperate with a C library whose API exposes raw file descriptors.
You don’t need to be scared of trio.lowlevel
, as long as you take proper precautions. These are real public APIs, with strictly defined and carefully documented semantics. They’re the same tools we use to implement all the nice high-level APIs in the trio
namespace. But, be careful. Some of those strict semantics have nasty big pointy teeth. If you make a mistake, Trio may not be able to handle it gracefully; conventions and guarantees that are followed strictly in the rest of Trio do not always apply. When you use this module, it’s your job to think about how you’re going to handle the tricky cases so you can expose a friendly Trio-style API to your users.
Debugging and instrumentation
Trio tries hard to provide useful hooks for debugging and instrumentation. Some are documented above (the nursery introspection attributes, trio.Lock.statistics()
, etc.). Here are some more.
Global statistics
-
Returns an object containing run-loop-level debugging information.
Currently the following fields are defined:
tasks_living
(int): The number of tasks that have been spawned and not yet exited.tasks_runnable
(int): The number of tasks that are currently queued on the run queue (as opposed to blocked waiting for something to happen).seconds_to_next_deadline
(float): The time until the next pending cancel scope deadline. May be negative if the deadline has expired but we haven’t yet processed cancellations. May beinf
if there are no pending deadlines.run_sync_soon_queue_size
(int): The number of unprocessed callbacks queued viatrio.lowlevel.TrioToken.run_sync_soon()
.io_statistics
(object): Some statistics from Trio’s I/O backend. This always has an attributebackend
which is a string naming which operating-system-specific I/O backend is in use; the other attributes vary between backends.
trio.lowlevel.current_statistics()
The current clock
-
Returns the current
Clock
.
trio.lowlevel.current_clock()
Instrument API
The instrument API provides a standard way to add custom instrumentation to the run loop. Want to make a histogram of scheduling latencies, log a stack trace of any task that blocks the run loop for >50 ms, or measure what percentage of your process’s running time is spent waiting for I/O? This is the place.
The general idea is that at any given moment, trio.run()
maintains a set of “instruments”, which are objects that implement the trio.abc.Instrument
interface. When an interesting event happens, it loops over these instruments and notifies them by calling an appropriate method. The tutorial has a simple example of using this for tracing.
Since this hooks into Trio at a rather low level, you do have to be careful. The callbacks are run synchronously, and in many cases if they error out then there isn’t any plausible way to propagate this exception (for instance, we might be deep in the guts of the exception propagation machinery…). Therefore our current strategy for handling exceptions raised by instruments is to (a) log an exception to the "trio.abc.Instrument"
logger, which by default prints a stack trace to standard error and (b) disable the offending instrument.
You can register an initial list of instruments by passing them to trio.run()
. add_instrument()
and remove_instrument()
let you add and remove instruments at runtime.
-
Start instrumenting the current run loop with the given instrument.
-
instrument
(trio.abc.Instrument) – The instrument to activate.
Parameters
If
instrument
is already active, does nothing. -
trio.lowlevel.add_instrument(instrument: trio.abc.Instrument) → None
-
Stop instrumenting the current run loop with the given instrument.
-
instrument
(trio.abc.Instrument) – The instrument to de-activate. -
KeyError
– if the instrument is not currently active. This could occur either because you never added it, or because you added it and then it raised an unhandled exception and was automatically deactivated.
Parameters
Raises
-
trio.lowlevel.remove_instrument(instrument: trio.abc.Instrument) → None
And here’s the interface to implement if you want to build your own Instrument
:
-
The interface for run loop instrumentation.
Instruments don’t have to inherit from this abstract base class, and all of these methods are optional. This class serves mostly as documentation.
-
Called after handling pending I/O.
-
timeout
(float) – The number of seconds we were willing to wait. This much time may or may not have elapsed, depending on whether any I/O was ready.
Parameters
-
after_io_wait(timeout)
-
Called just before
trio.run()
returns.
after_run()
-
Called when we return to the main run loop after a task has yielded.
-
task
(trio.lowlevel.Task) – The task that just ran.
Parameters
-
after_task_step(task)
-
Called before blocking to wait for I/O readiness.
-
timeout
(float) – The number of seconds we are willing to wait.
Parameters
-
before_io_wait(timeout)
-
Called at the beginning of
trio.run()
.
before_run()
-
Called immediately before we resume running the given task.
-
task
(trio.lowlevel.Task) – The task that is about to run.
Parameters
-
before_task_step(task)
-
Called when the given task exits.
-
task
(trio.lowlevel.Task) – The finished task.
Parameters
-
task_exited(task)
-
Called when the given task becomes runnable.
It may still be some time before it actually runs, if there are other runnable tasks ahead of it.
-
task
(trio.lowlevel.Task) – The task that became runnable.
Parameters
-
task_scheduled(task)
-
Called when the given task is created.
-
task
(trio.lowlevel.Task) – The new task.
Parameters
-
task_spawned(task)
-
class trio.abc.Instrument
The tutorial has a fully-worked example of defining a custom instrument to log Trio’s internal scheduling decisions.
Low-level I/O primitives
Different environments expose different low-level APIs for performing async I/O. trio.lowlevel
exposes these APIs in a relatively direct way, so as to allow maximum power and flexibility for higher level code. However, this means that the exact API provided may vary depending on what system Trio is running on.
Universally available API
All environments provide the following functions:
-
Block until the kernel reports that the given object is readable.
On Unix systems,
obj
must either be an integer file descriptor, or else an object with a.fileno()
method which returns an integer file descriptor. Any kind of file descriptor can be passed, though the exact semantics will depend on your kernel. For example, this probably won’t do anything useful for on-disk files.On Windows systems,
obj
must either be an integerSOCKET
handle, or else an object with a.fileno()
method which returns an integerSOCKET
handle. File descriptors aren’t supported, and neither are handles that refer to anything besides aSOCKET
.-
trio.BusyResourceError
– if another task is already waiting for the given socket to become readable.trio.ClosedResourceError
– if another task callsnotify_closing()
while this function is still working.
Raises
-
await trio.lowlevel.wait_readable(obj)
-
Block until the kernel reports that the given object is writable.
See
wait_readable
for the definition ofobj
.-
trio.BusyResourceError
– if another task is already waiting for the given socket to become writable.trio.ClosedResourceError
– if another task callsnotify_closing()
while this function is still working.
Raises
-
await trio.lowlevel.wait_writable(obj)
-
Call this before closing a file descriptor (on Unix) or socket (on Windows). This will cause any
wait_readable
orwait_writable
calls on the given object to immediately wake up and raiseClosedResourceError
.This doesn’t actually close the object – you still have to do that yourself afterwards. Also, you want to be careful to make sure no new tasks start waiting on the object in between when you call this and when it’s actually closed. So to close something properly, you usually want to do these steps in order:
Explicitly mark the object as closed, so that any new attempts to use it will abort before they start.
Call
notify_closing
to wake up any already-existing users.Actually close the object.
It’s also possible to do them in a different order if that’s more convenient, but only if you make sure not to have any checkpoints in between the steps. This way they all happen in a single atomic step, so other tasks won’t be able to tell what order they happened in anyway.
trio.lowlevel.notify_closing(obj)
Unix-specific API
FdStream
supports wrapping Unix files (such as a pipe or TTY) as a stream.
If you have two different file descriptors for sending and receiving, and want to bundle them together into a single bidirectional Stream
, then use trio.StapledStream
:
bidirectional_stream = trio.StapledStream( trio.lowlevel.FdStream(write_fd), trio.lowlevel.FdStream(read_fd) )
-
Bases:
trio.abc.Stream
Represents a stream given the file descriptor to a pipe, TTY, etc.
fd must refer to a file that is open for reading and/or writing and supports non-blocking I/O (pipes and TTYs will work, on-disk files probably not). The returned stream takes ownership of the fd, so closing the stream will close the fd too. As with
os.fdopen
, you should not directly use an fd after you have wrapped it in a stream using this function.To be used as a Trio stream, an open file must be placed in non-blocking mode. Unfortunately, this impacts all I/O that goes through the underlying open file, including I/O that uses a different file descriptor than the one that was passed to Trio. If other threads or processes are using file descriptors that are related through
os.dup
or inheritance acrossos.fork
to the one that Trio is using, they are unlikely to be prepared to have non-blocking I/O semantics suddenly thrust upon them. For example, you can useFdStream(os.dup(sys.stdin.fileno()))
to obtain a stream for reading from standard input, but it is only safe to do so with heavy caveats: your stdin must not be shared by any other processes and you must not make any calls to synchronous methods ofsys.stdin
until the stream returned byFdStream
is closed. See issue #174 for a discussion of the challenges involved in relaxing this restriction.
class trio.lowlevel.FdStream(fd: int)
Kqueue-specific API
TODO: these are implemented, but are currently more of a sketch than anything real. See #26.
trio.lowlevel.current_kqueue()
await trio.lowlevel.wait_kevent(ident, filter, abort_func)
with trio.lowlevel.monitor_kevent(ident, filter) as queue
Windows-specific API
-
Async and cancellable variant of WaitForSingleObject. Windows only.
-
handle
– A Win32 object handle, as a Python integer. -
OSError
– If the handle is invalid, e.g. when it is already closed.
Parameters
Raises
-
await trio.lowlevel.WaitForSingleObject(handle)
TODO: these are implemented, but are currently more of a sketch than anything real. See #26 and #52.
trio.lowlevel.register_with_iocp(handle)
await trio.lowlevel.wait_overlapped(handle, lpOverlapped)
trio.lowlevel.current_iocp()
with trio.lowlevel.monitor_completion_key() as queue
Global state: system tasks and run-local variables
-
The run-local variant of a context variable.
RunVar
objects are similar to context variable objects, except that they are shared across a single call totrio.run()
rather than a single task.
class trio.lowlevel.RunVar(name, default=
-
Spawn a “system” task.
System tasks have a few differences from regular tasks:
They don’t need an explicit nursery; instead they go into the internal “system nursery”.
If a system task raises an exception, then it’s converted into a
TrioInternalError
and all tasks are cancelled. If you write a system task, you should be careful to make sure it doesn’t crash.System tasks are automatically cancelled when the main task exits.
By default, system tasks have
KeyboardInterrupt
protection enabled. If you want your task to be interruptible by control-C, then you need to usedisable_ki_protection()
explicitly (and come up with some plan for what to do with aKeyboardInterrupt
, given that system tasks aren’t allowed to raise exceptions).System tasks do not inherit context variables from their creator.
Towards the end of a call to
trio.run()
, after the main task and all system tasks have exited, the system nursery becomes closed. At this point, new calls tospawn_system_task()
will raiseRuntimeError("Nursery is closed to new arrivals")
instead of creating a system task. It’s possible to encounter this state either in afinally
block in an async generator, or in a callback passed toTrioToken.run_sync_soon()
at the right moment.-
async_fn
– An async callable.args
– Positional arguments forasync_fn
. If you want to pass keyword arguments, usefunctools.partial()
.name
– The name for this task. Only used for debugging/introspection (e.g.repr(task_obj)
). If this isn’t a string,spawn_system_task()
will try to make it one. A common use case is if you’re wrapping a function before spawning a new task, you might pass the original function as thename=
to make debugging easier.
-
the newly spawned task
Parameters
Returns
Return type
trio.lowlevel.spawn_system_task(async_fn, *args, name=None)
Trio tokens
-
An opaque object representing a single call to
trio.run()
.It has no public constructor; instead, see
current_trio_token()
.This object has two uses:
It lets you re-enter the Trio run loop from external threads or signal handlers. This is the low-level primitive that
trio.to_thread()
andtrio.from_thread
use to communicate with worker threads, thattrio.open_signal_receiver
uses to receive notifications about signals, and so forth.Each call to
trio.run()
has exactly one associatedTrioToken
object, so you can use it to identify a particular call.
-
Schedule a call to
sync_fn(*args)
to occur in the context of a Trio task.This is safe to call from the main thread, from other threads, and from signal handlers. This is the fundamental primitive used to re-enter the Trio run loop from outside of it.
The call will happen “soon”, but there’s no guarantee about exactly when, and no mechanism provided for finding out when it’s happened. If you need this, you’ll have to build your own.
The call is effectively run as part of a system task (see
spawn_system_task()
). In particular this means that:KeyboardInterrupt
protection is enabled by default; if you wantsync_fn
to be interruptible by control-C, then you need to usedisable_ki_protection()
explicitly.If
sync_fn
raises an exception, then it’s converted into aTrioInternalError
and all tasks are cancelled. You should be careful thatsync_fn
doesn’t crash.
All calls with
idempotent=False
are processed in strict first-in first-out order.If
idempotent=True
, thensync_fn
andargs
must be hashable, and Trio will make a best-effort attempt to discard any call submission which is equal to an already-pending call. Trio will process these in first-in first-out order.Any ordering guarantees apply separately to
idempotent=False
andidempotent=True
calls; there’s no rule for how calls in the different categories are ordered with respect to each other.-
trio.RunFinishedError
– if the associated call totrio.run()
has already exited. (Any call that doesn’t raise this error is guaranteed to be fully processed beforetrio.run()
exits.)
Raises
run_sync_soon(sync_fn, *args, idempotent=False)
class trio.lowlevel.TrioToken
-
Retrieve the
TrioToken
for the current call totrio.run()
.
trio.lowlevel.current_trio_token()
Spawning threads
-
Runs
deliver(outcome.capture(fn))
in a worker thread.Generally
fn
does some blocking work, anddeliver
delivers the result back to whoever is interested.This is a low-level, no-frills interface, very similar to using
threading.Thread
to spawn a thread directly. The main difference is that this function tries to re-use threads when possible, so it can be a bit faster thanthreading.Thread
.Worker threads have the
daemon
flag set, which means that if your main thread exits, worker threads will automatically be killed. If you want to make sure that yourfn
runs to completion, then you should make sure that the main thread remains alive untildeliver
is called.It is safe to call this function simultaneously from multiple threads.
-
fn
(sync function) – Performs arbitrary blocking work.deliver
(sync function) – Takes theoutcome.Outcome
offn
, and delivers it. Must not block.
Parameters
Because worker threads are cached and reused for multiple calls, neither function should mutate thread-level state, like
threading.local
objects – or if they do, they should be careful to revert their changes before returning.Note
The split between
fn
anddeliver
serves two purposes. First, it’s convenient, since most callers need something like this anyway.Second, it avoids a small race condition that could cause too many threads to be spawned. Consider a program that wants to run several jobs sequentially on a thread, so the main thread submits a job, waits for it to finish, submits another job, etc. In theory, this program should only need one worker thread. But what could happen is:
Worker thread: First job finishes, and calls
deliver
.Main thread: receives notification that the job finished, and calls
start_thread_soon
.Main thread: sees that no worker threads are marked idle, so spawns a second worker thread.
Original worker thread: marks itself as idle.
To avoid this, threads mark themselves as idle before calling
deliver
.Is this potential extra thread a major problem? Maybe not, but it’s easy enough to avoid, and we figure that if the user is trying to limit how many threads they’re using then it’s polite to respect that.
-
trio.lowlevel.start_thread_soon(fn, deliver)
Safer KeyboardInterrupt handling
Trio’s handling of control-C is designed to balance usability and safety. On the one hand, there are sensitive regions (like the core scheduling loop) where it’s simply impossible to handle arbitrary KeyboardInterrupt
exceptions while maintaining our core correctness invariants. On the other, if the user accidentally writes an infinite loop, we do want to be able to break out of that. Our solution is to install a default signal handler which checks whether it’s safe to raise KeyboardInterrupt
at the place where the signal is received. If so, then we do; otherwise, we schedule a KeyboardInterrupt
to be delivered to the main task at the next available opportunity (similar to how Cancelled
is delivered).
So that’s great, but – how do we know whether we’re in one of the sensitive parts of the program or not?
This is determined on a function-by-function basis. By default:
The top-level function in regular user tasks is unprotected.
The top-level function in system tasks is protected.
If a function doesn’t specify otherwise, then it inherits the protection state of its caller.
This means you only need to override the defaults at places where you transition from protected code to unprotected code or vice-versa.
These transitions are accomplished using two function decorators:
-
Decorator that marks the given regular function, generator function, async function, or async generator function as unprotected against
KeyboardInterrupt
, i.e., the code inside this function can be rudely interrupted byKeyboardInterrupt
at any moment.If you have multiple decorators on the same function, then this should be at the bottom of the stack (closest to the actual function).
An example of where you’d use this is in implementing something like
trio.from_thread.run()
, which usesTrioToken.run_sync_soon()
to get into the Trio thread.run_sync_soon()
callbacks are run withKeyboardInterrupt
protection enabled, andtrio.from_thread.run()
takes advantage of this to safely set up the machinery for sending a response back to the original thread, but then usesdisable_ki_protection()
when entering the user-provided function.
@trio.lowlevel.disable_ki_protection
-
Decorator that marks the given regular function, generator function, async function, or async generator function as protected against
KeyboardInterrupt
, i.e., the code inside this function won’t be rudely interrupted byKeyboardInterrupt
. (Though if it contains any checkpoints, then it can still receiveKeyboardInterrupt
at those. This is considered a polite interruption.)Warning
Be very careful to only use this decorator on functions that you know will either exit in bounded time, or else pass through a checkpoint regularly. (Of course all of your functions should have this property, but if you mess it up here then you won’t even be able to use control-C to escape!)
If you have multiple decorators on the same function, then this should be at the bottom of the stack (closest to the actual function).
An example of where you’d use this is on the
__exit__
implementation for something like aLock
, where a poorly-timedKeyboardInterrupt
could leave the lock in an inconsistent state and cause a deadlock.
@trio.lowlevel.enable_ki_protection
-
Check whether the calling code has
KeyboardInterrupt
protection enabled.It’s surprisingly easy to think that one’s
KeyboardInterrupt
protection is enabled when it isn’t, or vice-versa. This function tells you what Trio thinks of the matter, which makes it useful forassert
s and unit tests.-
True if protection is enabled, and False otherwise.
Returns
Return type
-
trio.lowlevel.currently_ki_protected()
Sleeping and waking
Wait queue abstraction
-
A fair wait queue with cancellation and requeueing.
This class encapsulates the tricky parts of implementing a wait queue. It’s useful for implementing higher-level synchronization primitives like queues and locks.
In addition to the methods below, you can use
len(parking_lot)
to get the number of parked tasks, andif parking_lot: ...
to check whether there are any parked tasks.-
Park the current task until woken by a call to
unpark()
orunpark_all()
.
await park()
-
Move parked tasks from one
ParkingLot
object to another.This dequeues
count
tasks from one lot, and requeues them on another, preserving order. For example:async def parker(lot): print("sleeping") await lot.park() print("woken") async def main(): lot1 = trio.lowlevel.ParkingLot() lot2 = trio.lowlevel.ParkingLot() async with trio.open_nursery() as nursery: nursery.start_soon(parker, lot1) await trio.testing.wait_all_tasks_blocked() assert len(lot1) == 1 assert len(lot2) == 0 lot1.repark(lot2) assert len(lot1) == 0 assert len(lot2) == 1 # This wakes up the task that was originally parked in lot1 lot2.unpark()
If there are fewer than
count
tasks parked, then reparks as many tasks as are available and then returns successfully.-
new_lot
(ParkingLot) – the parking lot to move tasks to.count
(int) – the number of tasks to move.
Parameters
-
repark(new_lot, *, count=1)
-
Move all parked tasks from one
ParkingLot
object to another.See
repark()
for details.
repark_all(new_lot)
-
Return an object containing debugging information.
Currently the following fields are defined:
tasks_waiting
: The number of tasks blocked on this lot’spark()
method.
statistics()
-
Unpark one or more tasks.
This wakes up
count
tasks that are blocked inpark()
. If there are fewer thancount
tasks parked, then wakes as many tasks are available and then returns successfully.-
count
(int) – the number of tasks to unpark.
Parameters
-
unpark(*, count=1)
-
Unpark all parked tasks.
unpark_all()
-
class trio.lowlevel.ParkingLot
Low-level checkpoint functions
-
A pure checkpoint.
This checks for cancellation and allows other tasks to be scheduled, without otherwise blocking.
Note that the scheduler has the option of ignoring this and continuing to run the current task if it decides this is appropriate (e.g. for increased efficiency).
Equivalent to
await trio.sleep(0)
(which is implemented by callingcheckpoint()
.)
await trio.lowlevel.checkpoint()
The next two functions are used together to make up a checkpoint:
-
Issue a checkpoint if the calling context has been cancelled.
Equivalent to (but potentially more efficient than):
if trio.current_deadline() == -inf: await trio.lowlevel.checkpoint()
This is either a no-op, or else it allow other tasks to be scheduled and then raises
trio.Cancelled
.Typically used together with
cancel_shielded_checkpoint()
.
await trio.lowlevel.checkpoint_if_cancelled()
-
Introduce a schedule point, but not a cancel point.
This is not a checkpoint, but it is half of a checkpoint, and when combined with
checkpoint_if_cancelled()
it can make a full checkpoint.Equivalent to (but potentially more efficient than):
with trio.CancelScope(shield=True): await trio.lowlevel.checkpoint()
await trio.lowlevel.cancel_shielded_checkpoint()
These are commonly used in cases where you have an operation that might-or-might-not block, and you want to implement Trio’s standard checkpoint semantics. Example:
async def operation_that_maybe_blocks(): await checkpoint_if_cancelled() try: ret = attempt_operation() except BlockingIOError: # need to block and then retry, which we do below pass else: # operation succeeded, finish the checkpoint then return await cancel_shielded_checkpoint() return ret while True: await wait_for_operation_to_be_ready() try: return attempt_operation() except BlockingIOError: pass
This logic is a bit convoluted, but accomplishes all of the following:
Every successful execution path passes through a checkpoint (assuming that
wait_for_operation_to_be_ready
is an unconditional checkpoint)Our cancellation semantics say that
Cancelled
should only be raised if the operation didn’t happen. Usingcancel_shielded_checkpoint()
on the early-exit branch accomplishes this.On the path where we do end up blocking, we don’t pass through any schedule points before that, which avoids some unnecessary work.
Avoids implicitly chaining the
BlockingIOError
with any errors raised byattempt_operation
orwait_for_operation_to_be_ready
, by keeping thewhile True:
loop outside of theexcept BlockingIOError:
block.
These functions can also be useful in other situations. For example, when trio.to_thread.run_sync()
schedules some work to run in a worker thread, it blocks until the work is finished (so it’s a schedule point), but by default it doesn’t allow cancellation. So to make sure that the call always acts as a checkpoint, it calls checkpoint_if_cancelled()
before starting the thread.
Low-level blocking
-
Put the current task to sleep, with cancellation support.
This is the lowest-level API for blocking in Trio. Every time a
Task
blocks, it does so by calling this function (usually indirectly via some higher-level API).This is a tricky interface with no guard rails. If you can use
ParkingLot
or the built-in I/O wait functions instead, then you should.Generally the way it works is that before calling this function, you make arrangements for “someone” to call
reschedule()
on the current task at some later point.Then you call
wait_task_rescheduled()
, passing inabort_func
, an “abort callback”.(Terminology: in Trio, “aborting” is the process of attempting to interrupt a blocked task to deliver a cancellation.)
There are two possibilities for what happens next:
“Someone” calls
reschedule()
on the current task, andwait_task_rescheduled()
returns or raises whatever value or error was passed toreschedule()
.-
The call’s context transitions to a cancelled state (e.g. due to a timeout expiring). When this happens, the
abort_func
is called. Its interface looks like:def abort_func(raise_cancel): ... return trio.lowlevel.Abort.SUCCEEDED # or FAILED
It should attempt to clean up any state associated with this call, and in particular, arrange that
reschedule()
will not be called later. If (and only if!) it is successful, then it should returnAbort.SUCCEEDED
, in which case the task will automatically be rescheduled with an appropriateCancelled
error.Otherwise, it should return
Abort.FAILED
. This means that the task can’t be cancelled at this time, and still has to make sure that “someone” eventually callsreschedule()
.At that point there are again two possibilities. You can simply ignore the cancellation altogether: wait for the operation to complete and then reschedule and continue as normal. (For example, this is what
trio.to_thread.run_sync()
does if cancellation is disabled.) The other possibility is that theabort_func
does succeed in cancelling the operation, but for some reason isn’t able to report that right away. (Example: on Windows, it’s possible to request that an async (“overlapped”) I/O operation be cancelled, but this request is also asynchronous – you don’t find out until later whether the operation was actually cancelled or not.) To report a delayed cancellation, then you should reschedule the task yourself, and call theraise_cancel
callback passed toabort_func
to raise aCancelled
(or possiblyKeyboardInterrupt
) exception into this task. Either of the approaches sketched below can work:# Option 1: # Catch the exception from raise_cancel and inject it into the task. # (This is what Trio does automatically for you if you return # Abort.SUCCEEDED.) trio.lowlevel.reschedule(task, outcome.capture(raise_cancel)) # Option 2: # wait to be woken by "someone", and then decide whether to raise # the error from inside the task. outer_raise_cancel = None def abort(inner_raise_cancel): nonlocal outer_raise_cancel outer_raise_cancel = inner_raise_cancel TRY_TO_CANCEL_OPERATION() return trio.lowlevel.Abort.FAILED await wait_task_rescheduled(abort) if OPERATION_WAS_SUCCESSFULLY_CANCELLED: # raises the error outer_raise_cancel()
In any case it’s guaranteed that we only call the
abort_func
at most once per call towait_task_rescheduled()
.
Sometimes, it’s useful to be able to share some mutable sleep-related data between the sleeping task, the abort function, and the waking task. You can use the sleeping task’s
custom_sleep_data
attribute to store this data, and Trio won’t touch it, except to make sure that it gets cleared when the task is rescheduled.Warning
If your
abort_func
raises an error, or returns any value other thanAbort.SUCCEEDED
orAbort.FAILED
, then Trio will crash violently. Be careful! Similarly, it is entirely possible to deadlock a Trio program by failing to reschedule a blocked task, or cause havoc by callingreschedule()
too many times. Remember what we said up above about how you should use a higher-level API if at all possible?
await trio.lowlevel.wait_task_rescheduled(abort_func)
-
enum.Enum
used as the return value from abort functions.See
wait_task_rescheduled()
for details.SUCCEEDED
FAILED
class trio.lowlevel.Abort(value)
-
Reschedule the given task with the given
outcome.Outcome
.See
wait_task_rescheduled()
for the gory details.There must be exactly one call to
reschedule()
for every call towait_task_rescheduled()
. (And when counting, keep in mind that returningAbort.SUCCEEDED
from an abort callback is equivalent to callingreschedule()
once.)-
task
(trio.lowlevel.Task) – the task to be rescheduled. Must be blocked in a call towait_task_rescheduled()
.next_send
(outcome.Outcome) – the value (or error) to return (or raise) fromwait_task_rescheduled()
.
Parameters
-
trio.lowlevel.reschedule(task, next_send=
Here’s an example lock class implemented using wait_task_rescheduled()
directly. This implementation has a number of flaws, including lack of fairness, O(n) cancellation, missing error checking, failure to insert a checkpoint on the non-blocking path, etc. If you really want to implement your own lock, then you should study the implementation of trio.Lock
and use ParkingLot
, which handles some of these issues for you. But this does serve to illustrate the basic structure of the wait_task_rescheduled()
API:
class NotVeryGoodLock: def __init__(self): self._blocked_tasks = collections.deque() self._held = False async def acquire(self): while self._held: task = trio.lowlevel.current_task() self._blocked_tasks.append(task) def abort_fn(_): self._blocked_tasks.remove(task) return trio.lowlevel.Abort.SUCCEEDED await trio.lowlevel.wait_task_rescheduled(abort_fn) self._held = True def release(self): self._held = False if self._blocked_tasks: woken_task = self._blocked_tasks.popleft() trio.lowlevel.reschedule(woken_task)
Task API
-
Returns the current root
Task
.This is the task that is the ultimate parent of all other tasks.
trio.lowlevel.current_root_task()
-
Return the
Task
object representing the current task.-
the
Task
that calledcurrent_task()
.
Returns
Return type
-
trio.lowlevel.current_task()
-
A
Task
object represents a concurrent “thread” of execution. It has no public constructor; Trio internally creates aTask
object for each call tonursery.start(...)
ornursery.start_soon(...)
.Its public members are mostly useful for introspection and debugging:
-
String containing this
Task
's name. Usually the name of the function thisTask
is running, but can be overridden by passingname=
tostart
orstart_soon
.
name
-
This task’s coroutine object. Example usage: extracting a stack trace:
import traceback def walk_coro_stack(coro): while coro is not None: if hasattr(coro, "cr_frame"): # A real coroutine yield coro.cr_frame, coro.cr_frame.f_lineno coro = coro.cr_await else: # A generator decorated with @types.coroutine yield coro.gi_frame, coro.gi_frame.f_lineno coro = coro.gi_yieldfrom def print_stack_for_task(task): ss = traceback.StackSummary.extract(walk_coro_stack(task.coro)) print("".join(ss.format()))
coro
-
This task’s
contextvars.Context
object.
context
-
The nursery this task is inside (or None if this is the “init” task).
Example use case: drawing a visualization of the task tree in a debugger.
parent_nursery
-
The nursery this task will be inside after it calls
task_status.started()
.If this task has already called
started()
, or if it was not spawned usingnursery.start()
, then itseventual_parent_nursery
isNone
.
eventual_parent_nursery
-
The nurseries this task contains.
This is a list, with outer nurseries before inner nurseries.
child_nurseries
-
Trio doesn’t assign this variable any meaning, except that it sets it to
None
whenever a task is rescheduled. It can be used to share data between the different tasks involved in putting a task to sleep and then waking it up again. (Seewait_task_rescheduled()
for details.)
custom_sleep_data
-
class trio.lowlevel.Task
Using “guest mode” to run Trio on top of other event loops
What is “guest mode”?
An event loop acts as a central coordinator to manage all the IO happening in your program. Normally, that means that your application has to pick one event loop, and use it for everything. But what if you like Trio, but also need to use a framework like Qt or PyGame that has its own event loop? Then you need some way to run both event loops at once.
It is possible to combine event loops, but the standard approaches all have significant downsides:
Polling: this is where you use a busy-loop to manually check for IO on both event loops many times per second. This adds latency, and wastes CPU time and electricity.
Pluggable IO backends: this is where you reimplement one of the event loop APIs on top of the other, so you effectively end up with just one event loop. This requires a significant amount of work for each pair of event loops you want to integrate, and different backends inevitably end up with inconsistent behavior, forcing users to program against the least-common-denominator. And if the two event loops expose different feature sets, it may not even be possible to implement one in terms of the other.
Running the two event loops in separate threads: This works, but most event loop APIs aren’t thread-safe, so in this approach you need to keep careful track of which code runs on which event loop, and remember to use explicit inter-thread messaging whenever you interact with the other loop – or else risk obscure race conditions and data corruption.
That’s why Trio offers a fourth option: guest mode. Guest mode lets you execute trio.run
on top of some other “host” event loop, like Qt. Its advantages are:
Efficiency: guest mode is event-driven instead of using a busy-loop, so it has low latency and doesn’t waste electricity.
-
No need to think about threads: your Trio code runs in the same thread as the host event loop, so you can freely call sync Trio APIs from the host, and call sync host APIs from Trio. For example, if you’re making a GUI app with Qt as the host loop, then making a cancel button and connecting it to a
trio.CancelScope
is as easy as writing:# Trio code can create Qt objects without any special ceremony... my_cancel_button = QPushButton("Cancel") # ...and Qt can call back to Trio just as easily my_cancel_button.clicked.connect(my_cancel_scope.cancel)
(For async APIs, it’s not that simple, but you can use sync APIs to build explicit bridges between the two worlds, e.g. by passing async functions and their results back and forth through queues.)
Consistent behavior: guest mode uses the same code as regular Trio: the same scheduler, same IO code, same everything. So you get the full feature set and everything acts the way you expect.
Simple integration and broad compatibility: pretty much every event loop offers some threadsafe “schedule a callback” operation, and that’s all you need to use it as a host loop.
Really? How is that possible?
Note
You can use guest mode without reading this section. It’s included for those who enjoy understanding how things work.
All event loops have the same basic structure. They loop through two operations, over and over:
Wait for the operating system to notify them that something interesting has happened, like data arriving on a socket or a timeout passing. They do this by invoking a platform-specific
sleep_until_something_happens()
system call –select
,epoll
,kqueue
,GetQueuedCompletionEvents
, etc.Run all the user tasks that care about whatever happened, then go back to step 1.
The problem here is step 1. Two different event loops on the same thread can take turns running user tasks in step 2, but when they’re idle and nothing is happening, they can’t both invoke their own sleep_until_something_happens()
function at the same time.
The “polling” and “pluggable backend” strategies solve this by hacking the loops so both step 1s can run at the same time in the same thread. Keeping everything in one thread is great for step 2, but the step 1 hacks create problems.
The “separate threads” strategy solves this by moving both steps into separate threads. This makes step 1 work, but the downside is that now the user tasks in step 2 are running separate threads as well, so users are forced to deal with inter-thread coordination.
The idea behind guest mode is to combine the best parts of each approach: we move Trio’s step 1 into a separate worker thread, while keeping Trio’s step 2 in the main host thread. This way, when the application is idle, both event loops do their sleep_until_something_happens()
at the same time in their own threads. But when the app wakes up and your code is actually running, it all happens in a single thread. The threading trickiness is all handled transparently inside Trio.
Concretely, we unroll Trio’s internal event loop into a chain of callbacks, and as each callback finishes, it schedules the next callback onto the host loop or a worker thread as appropriate. So the only thing the host loop has to provide is a way to schedule a callback onto the main thread from a worker thread.
Coordinating between Trio and the host loop does add some overhead. The main cost is switching in and out of the background thread, since this requires cross-thread messaging. This is cheap (on the order of a few microseconds, assuming your host loop is implemented efficiently), but it’s not free.
But, there’s a nice optimization we can make: we only need the thread when our sleep_until_something_happens()
call actually sleeps, that is, when the Trio part of your program is idle and has nothing to do. So before we switch into the worker thread, we double-check whether we’re idle, and if not, then we skip the worker thread and jump directly to step 2. This means that your app only pays the extra thread-switching penalty at moments when it would otherwise be sleeping, so it should have minimal effect on your app’s overall performance.
The total overhead will depend on your host loop, your platform, your application, etc. But we expect that in most cases, apps running in guest mode should only be 5-10% slower than the same code using trio.run
. If you find that’s not true for your app, then please let us know and we’ll see if we can fix it!
Implementing guest mode for your favorite event loop
Let’s walk through what you need to do to integrate Trio’s guest mode with your favorite event loop. Treat this section like a checklist.
Getting started: The first step is to get something basic working. Here’s a minimal example of running Trio on top of asyncio, that you can use as a model:
import asyncio, trio # A tiny Trio program async def trio_main(): for i in range(5): print(f"Hello from Trio!") # This is inside Trio, so we have to use Trio APIs await trio.sleep(1) return "trio done!" # The code to run it as a guest inside asyncio async def asyncio_main(): asyncio_loop = asyncio.get_running_loop() def run_sync_soon_threadsafe(fn): asyncio_loop.call_soon_threadsafe(fn) def done_callback(trio_main_outcome): print(f"Trio program ended with: {trio_main_outcome}") # This is where the magic happens: trio.lowlevel.start_guest_run( trio_main, run_sync_soon_threadsafe=run_sync_soon_threadsafe, done_callback=done_callback, ) # Let the host loop run for a while to give trio_main time to # finish. (WARNING: This is a hack. See below for better # approaches.) # # This function is in asyncio, so we have to use asyncio APIs. await asyncio.sleep(10) asyncio.run(asyncio_main())
You can see we’re using asyncio-specific APIs to start up a loop, and then we call trio.lowlevel.start_guest_run
. This function is very similar to trio.run
, and takes all the same arguments. But it has two differences:
First, instead of blocking until trio_main
has finished, it schedules trio_main
to start running on top of the host loop, and then returns immediately. So trio_main
is running in the background – that’s why we have to sleep and give it time to finish.
And second, it requires two extra keyword arguments: run_sync_soon_threadsafe
, and done_callback
.
For run_sync_soon_threadsafe
, we need a function that takes a synchronous callback, and schedules it to run on your host loop. And this function needs to be “threadsafe” in the sense that you can safely call it from any thread. So you need to figure out how to write a function that does that using your host loop’s API. For asyncio, this is easy because call_soon_threadsafe
does exactly what we need; for your loop, it might be more or less complicated.
For done_callback
, you pass in a function that Trio will automatically invoke when the Trio run finishes, so you know it’s done and what happened. For this basic starting version, we just print the result; in the next section we’ll discuss better alternatives.
At this stage you should be able to run a simple Trio program inside your host loop. Now we’ll turn that prototype into something solid.
Loop lifetimes: One of the trickiest things in most event loops is shutting down correctly. And having two event loops makes this even harder!
If you can, we recommend following this pattern:
Start up your host loop
Immediately call
start_guest_run
to start TrioWhen Trio finishes and your
done_callback
is invoked, shut down the host loopMake sure that nothing else shuts down your host loop
This way, your two event loops have the same lifetime, and your program automatically exits when your Trio function finishes.
Here’s how we’d extend our asyncio example to implement this pattern:
# Improved version, that shuts down properly after Trio finishes async def asyncio_main(): asyncio_loop = asyncio.get_running_loop() def run_sync_soon_threadsafe(fn): asyncio_loop.call_soon_threadsafe(fn) # Revised 'done' callback: set a Future done_fut = asyncio.Future() def done_callback(trio_main_outcome): done_fut.set_result(trio_main_outcome) trio.lowlevel.start_guest_run( trio_main, run_sync_soon_threadsafe=run_sync_soon_threadsafe, done_callback=done_callback, ) # Wait for the guest run to finish trio_main_outcome = await done_fut # Pass through the return value or exception from the guest run return trio_main_outcome.unwrap()
And then you can encapsulate all this machinery in a utility function that exposes a trio.run
-like API, but runs both loops together:
def trio_run_with_asyncio(trio_main, *args, **trio_run_kwargs): async def asyncio_main(): # same as above ... return asyncio.run(asyncio_main())
Technically, it is possible to use other patterns. But there are some important limitations you have to respect:
-
You must let the Trio program run to completion. Many event loops let you stop the event loop at any point, and any pending callbacks/tasks/etc. just… don’t run. Trio follows a more structured system, where you can cancel things, but the code always runs to completion, so
finally
blocks run, resources are cleaned up, etc. If you stop your host loop early, before thedone_callback
is invoked, then that cuts off the Trio run in the middle without a chance to clean up. This can leave your code in an inconsistent state, and will definitely leave Trio’s internals in an inconsistent state, which will cause errors if you try to use Trio again in that thread.Some programs need to be able to quit at any time, for example in response to a GUI window being closed or a user selecting a “Quit” from a menu. In these cases, we recommend wrapping your whole program in a
trio.CancelScope
, and cancelling it when you want to quit. Each host loop can only have one
start_guest_run
at a time. If you try to start a second one, you’ll get an error. If you need to run multiple Trio functions at the same time, then start up a single Trio run, open a nursery, and then start your functions as child tasks in that nursery.Unless you or your host loop register a handler for
signal.SIGINT
before starting Trio (this is not common), then Trio will take over delivery ofKeyboardInterrupt
s. And since Trio can’t tell which host code is safe to interrupt, it will only deliverKeyboardInterrupt
into the Trio part of your code. This is fine if your program is set up to exit when the Trio part exits, because theKeyboardInterrupt
will propagate out of Trio and then trigger the shutdown of your host loop, which is just what you want.
Given these constraints, we think the simplest approach is to always start and stop the two loops together.
Signal management: “Signals” are a low-level inter-process communication primitive. When you hit control-C to kill a program, that uses a signal. Signal handling in Python has a lot of moving parts. One of those parts is signal.set_wakeup_fd
, which event loops use to make sure that they wake up when a signal arrives so they can respond to it. (If you’ve ever had an event loop ignore you when you hit control-C, it was probably because they weren’t using signal.set_wakeup_fd
correctly.)
But, only one event loop can use signal.set_wakeup_fd
at a time. And in guest mode that can cause problems: Trio and the host loop might start fighting over who’s using signal.set_wakeup_fd
.
Some event loops, like asyncio, won’t work correctly unless they win this fight. Fortunately, Trio is a little less picky: as long as someone makes sure that the program wakes up when a signal arrives, it should work correctly. So if your host loop wants signal.set_wakeup_fd
, then you should disable Trio’s signal.set_wakeup_fd
support, and then both loops will work correctly.
On the other hand, if your host loop doesn’t use signal.set_wakeup_fd
, then the only way to make everything work correctly is to enable Trio’s signal.set_wakeup_fd
support.
By default, Trio assumes that your host loop doesn’t use signal.set_wakeup_fd
. It does try to detect when this creates a conflict with the host loop, and print a warning – but unfortunately, by the time it detects it, the damage has already been done. So if you’re getting this warning, then you should disable Trio’s signal.set_wakeup_fd
support by passing host_uses_signal_set_wakeup_fd=True
to start_guest_run
.
If you aren’t seeing any warnings with your initial prototype, you’re probably fine. But the only way to be certain is to check your host loop’s source. For example, asyncio may or may not use signal.set_wakeup_fd
depending on the Python version and operating system.
A small optimization: Finally, consider a small optimization. Some event loops offer two versions of their “call this function soon” API: one that can be used from any thread, and one that can only be used from the event loop thread, with the latter being cheaper. For example, asyncio has both call_soon_threadsafe
and call_soon
.
If you have a loop like this, then you can also pass a run_sync_soon_not_threadsafe=...
kwarg to start_guest_run
, and Trio will automatically use it when appropriate.
If your loop doesn’t have a split like this, then don’t worry about it; run_sync_soon_not_threadsafe=
is optional. (If it’s not passed, then Trio will just use your threadsafe version in all cases.)
That’s it! If you’ve followed all these steps, you should now have a cleanly-integrated hybrid event loop. Go make some cool GUIs/games/whatever!
Limitations
In general, almost all Trio features should work in guest mode. The exception is features which rely on Trio having a complete picture of everything that your program is doing, since obviously, it can’t control the host loop or see what it’s doing.
Custom clocks can be used in guest mode, but they only affect Trio timeouts, not host loop timeouts. And the autojump clock and related trio.testing.wait_all_tasks_blocked
can technically be used in guest mode, but they’ll only take Trio tasks into account when decided whether to jump the clock or whether all tasks are blocked.
Reference
-
Start a “guest” run of Trio on top of some other “host” event loop.
Each host loop can only have one guest run at a time.
You should always let the Trio run finish before stopping the host loop; if not, it may leave Trio’s internal data structures in an inconsistent state. You might be able to get away with it if you immediately exit the program, but it’s safest not to go there in the first place.
Generally, the best way to do this is wrap this in a function that starts the host loop and then immediately starts the guest run, and then shuts down the host when the guest run completes.
-
-
run_sync_soon_threadsafe
–An arbitrary callable, which will be passed a function as its sole argument:
def my_run_sync_soon_threadsafe(fn): ...
This callable should schedule
fn()
to be run by the host on its next pass through its loop.Must support being called from arbitrary threads.
-
done_callback
–An arbitrary callable:
def my_done_callback(run_outcome): ...
When the Trio run has finished, Trio will invoke this callback to let you know. The argument is an
outcome.Outcome
, reporting what would have been returned or raised bytrio.run
. This function can do anything you want, but commonly you’ll want it to shut down the host loop, unwrap the outcome, etc. run_sync_soon_not_threadsafe
– Likerun_sync_soon_threadsafe
, but will only be called from inside the host loop’s main thread. Optional, but if your host loop allows you to implement this more efficiently thanrun_sync_soon_threadsafe
then passing it will make things a bit faster.host_uses_signal_set_wakeup_fd
(bool) – PassTrue
if your host loop usessignal.set_wakeup_fd
, andFalse
otherwise. For more details, see Implementing guest mode for your favorite event loop.
-
Parameters
For the meaning of other arguments, see
trio.run
. -
trio.lowlevel.start_guest_run(async_fn, *args, run_sync_soon_threadsafe, done_callback, run_sync_soon_not_threadsafe=None, host_uses_signal_set_wakeup_fd=False, clock=None, instruments=(), restrict_keyboard_interrupt_to_checkpoints=False)
Handing off live coroutine objects between coroutine runners
Internally, Python’s async/await syntax is built around the idea of “coroutine objects” and “coroutine runners”. A coroutine object represents the state of an async callstack. But by itself, this is just a static object that sits there. If you want it to do anything, you need a coroutine runner to push it forward. Every Trio task has an associated coroutine object (see Task.coro
), and the Trio scheduler acts as their coroutine runner.
But of course, Trio isn’t the only coroutine runner in Python – asyncio
has one, other event loops have them, you can even define your own.
And in some very, very unusual circumstances, it even makes sense to transfer a single coroutine object back and forth between different coroutine runners. That’s what this section is about. This is an extremely exotic use case, and assumes a lot of expertise in how Python async/await works internally. For motivating examples, see trio-asyncio issue #42, and trio issue #649. For more details on how coroutines work, we recommend André Caron’s A tale of event loops, or going straight to PEP 492 for the full details.
-
Permanently detach the current task from the Trio scheduler.
Normally, a Trio task doesn’t exit until its coroutine object exits. When you call this function, Trio acts like the coroutine object just exited and the task terminates with the given outcome. This is useful if you want to permanently switch the coroutine object over to a different coroutine runner.
When the calling coroutine enters this function it’s running under Trio, and when the function returns it’s running under the foreign coroutine runner.
You should make sure that the coroutine object has released any Trio-specific resources it has acquired (e.g. nurseries).
-
final_outcome
(outcome.Outcome) – Trio acts as if the current task exited with the given return value or exception.
Parameters
Returns or raises whatever value or exception the new coroutine runner uses to resume the coroutine.
-
await trio.lowlevel.permanently_detach_coroutine_object(final_outcome)
-
Temporarily detach the current coroutine object from the Trio scheduler.
When the calling coroutine enters this function it’s running under Trio, and when the function returns it’s running under the foreign coroutine runner.
The Trio
Task
will continue to exist, but will be suspended until you usereattach_detached_coroutine_object()
to resume it. In the mean time, you can use another coroutine runner to schedule the coroutine object. In fact, you have to – the function doesn’t return until the coroutine is advanced from outside.Note that you’ll need to save the current
Task
object to later resume; you can retrieve it withcurrent_task()
. You can also use thisTask
object to retrieve the coroutine object – seeTask.coro
.-
abort_func
– Same as forwait_task_rescheduled()
, except that it must returnAbort.FAILED
. (If it returnedAbort.SUCCEEDED
, then Trio would attempt to reschedule the detached task directly without going throughreattach_detached_coroutine_object()
, which would be bad.) Yourabort_func
should still arrange for whatever the coroutine object is doing to be cancelled, and then reattach to Trio and call theraise_cancel
callback, if possible.
Parameters
Returns or raises whatever value or exception the new coroutine runner uses to resume the coroutine.
-
await trio.lowlevel.temporarily_detach_coroutine_object(abort_func)
-
Reattach a coroutine object that was detached using
temporarily_detach_coroutine_object()
.When the calling coroutine enters this function it’s running under the foreign coroutine runner, and when the function returns it’s running under Trio.
This must be called from inside the coroutine being resumed, and yields whatever value you pass in. (Presumably you’ll pass a value that will cause the current coroutine runner to stop scheduling this task.) Then the coroutine is resumed by the Trio scheduler at the next opportunity.
await trio.lowlevel.reattach_detached_coroutine_object(task, yield_value)
© 2017 Nathaniel J. Smith
Licensed under the MIT License.
https://trio.readthedocs.io/en/v0.18.0/reference-lowlevel.html