std.concurrency

This is a low-level messaging API upon which more structured or restrictive APIs may be built. The general idea is that every messageable entity is represented by a common handle type called a Tid, which allows messages to be sent to logical threads that are executing in both the current process and in external processes using the same interface. This is an important aspect of scalability because it allows the components of a program to be spread across available resources with few to no changes to the actual implementation.

A logical thread is an execution context that has its own stack and which runs asynchronously to other logical threads. These may be preemptively scheduled kernel threads, fibers (cooperative user-space threads), or some other concept with similar behavior.

The type of concurrency used when logical threads are created is determined by the Scheduler selected at initialization time. The default behavior is currently to create a new kernel thread per call to spawn, but other schedulers are available that multiplex fibers across the main thread or use some combination of the two approaches.

License:
Boost License 1.0.
Authors:
Sean Kelly, Alex Rønne Petersen, Martin Nowak
Source
std/concurrency.d
Examples:
__gshared string received;
static void spawnedFunc(Tid ownerTid)
{
    import std.conv : text;
    // Receive a message from the owner thread.
    receive((int i){
        received = text("Received the number ", i);

        // Send a message back to the owner thread
        // indicating success.
        send(ownerTid, true);
    });
}

// Start spawnedFunc in a new thread.
auto childTid = spawn(&spawnedFunc, thisTid);

// Send the number 42 to this new thread.
send(childTid, 42);

// Receive the result code.
auto wasSuccessful = receiveOnly!(bool);
assert(wasSuccessful);
writeln(received); // "Received the number 42"
class MessageMismatch: object.Exception;

Thrown on calls to receiveOnly if a message other than the type the receiving thread expected is sent.

pure nothrow @nogc @safe this(string msg = "Unexpected message type");
class OwnerTerminated: object.Exception;

Thrown on calls to receive if the thread that spawned the receiving thread has terminated and no more messages exist.

pure nothrow @nogc @safe this(Tid t, string msg = "Owner terminated");
class LinkTerminated: object.Exception;

Thrown if a linked thread has terminated.

pure nothrow @nogc @safe this(Tid t, string msg = "Link terminated");
class PriorityMessageException: object.Exception;

Thrown if a message was sent to a thread via std.concurrency.prioritySend and the receiver does not have a handler for a message of this type.

this(Variant vals);
Variant message;

The message that was sent.

class MailboxFull: object.Exception;

Thrown on mailbox crowding if the mailbox is configured with OnCrowding.throwException.

pure nothrow @nogc @safe this(Tid t, string msg = "Mailbox full");
class TidMissingException: object.Exception;

Thrown when a Tid is missing, e.g. when ownerTid doesn't find an owner thread.

struct Tid;

An opaque type used to represent a logical thread.

void toString(scope void delegate(const(char)[]) sink);

Generate a convenient string for identifying this Tid. This is only useful to see if Tid's that are currently executing are the same or different, e.g. for logging and debugging. It is potentially possible that a Tid executed in the future will have the same toString() output as another Tid that has already terminated.

@property @safe Tid thisTid();
Returns:
The Tid of the caller's thread.
@property Tid ownerTid();

Return the Tid of the thread which spawned the caller's thread.

Throws:
A TidMissingException exception if there is no owner thread.
Tid spawn(F, T...)(F fn, T args)
Constraints: if (isSpawnable!(F, T));

Starts fn(args) in a new logical thread.

Executes the supplied function in a new logical thread represented by Tid. The calling thread is designated as the owner of the new thread. When the owner thread terminates an OwnerTerminated message will be sent to the new thread, causing an OwnerTerminated exception to be thrown on receive().

Parameters:
F fn The function to execute.
T args Arguments to the function.
Returns:
A Tid representing the new logical thread.
Notes
args must not have unshared aliasing. In other words, all arguments to fn must either be shared or immutable or have no pointer indirection. This is necessary for enforcing isolation among threads.
Examples:
static void f(string msg)
{
    writeln(msg); // "Hello World"
}

auto tid = spawn(&f, "Hello World");
Examples:
Fails: char[] has mutable aliasing.
string msg = "Hello, World!";

static void f1(string msg) {}
static assert(!__traits(compiles, spawn(&f1, msg.dup)));
static assert( __traits(compiles, spawn(&f1, msg.idup)));

static void f2(char[] msg) {}
static assert(!__traits(compiles, spawn(&f2, msg.dup)));
static assert(!__traits(compiles, spawn(&f2, msg.idup)));
Examples:
New thread with anonymous function
spawn({
    ownerTid.send("This is so great!");
});
writeln(receiveOnly!string); // "This is so great!"
Tid spawnLinked(F, T...)(F fn, T args)
Constraints: if (isSpawnable!(F, T));

Starts fn(args) in a logical thread and will receive a LinkTerminated message when the operation terminates.

Executes the supplied function in a new logical thread represented by Tid. This new thread is linked to the calling thread so that if either it or the calling thread terminates a LinkTerminated message will be sent to the other, causing a LinkTerminated exception to be thrown on receive(). The owner relationship from spawn() is preserved as well, so if the link between threads is broken, owner termination will still result in an OwnerTerminated exception to be thrown on receive().

Parameters:
F fn The function to execute.
T args Arguments to the function.
Returns:
A Tid representing the new thread.
void send(T...)(Tid tid, T vals);

Places the values as a message at the back of tid's message queue.

Sends the supplied value to the thread represented by tid. As with std.concurrency.spawn, T must not have unshared aliasing.

void prioritySend(T...)(Tid tid, T vals);

Places the values as a message on the front of tid's message queue.

Send a message to tid but place it at the front of tid's message queue instead of at the back. This function is typically used for out-of-band communication, to signal exceptional conditions, etc.

void receive(T...)(T ops);

Receives a message from another thread.

Receive a message from another thread, or block if no messages of the specified types are available. This function works by pattern matching a message against a set of delegates and executing the first match found.

If a delegate that accepts a std.variant.Variant is included as the last argument to receive, it will match any message that was not matched by an earlier delegate. If more than one argument is sent, the Variant will contain a std.typecons.Tuple of all values sent.

Parameters:
T ops Variadic list of function pointers and delegates. Entries in this list must not occlude later entries.
Throws:
OwnerTerminated when the sending thread was terminated.
Examples:
import std.variant : Variant;

auto process = ()
{
    receive(
        (int i) { ownerTid.send(1); },
        (double f) { ownerTid.send(2); },
        (Variant v) { ownerTid.send(3); }
    );
};

{
    auto tid = spawn(process);
    send(tid, 42);
    writeln(receiveOnly!int); // 1
}

{
    auto tid = spawn(process);
    send(tid, 3.14);
    writeln(receiveOnly!int); // 2
}

{
    auto tid = spawn(process);
    send(tid, "something else");
    writeln(receiveOnly!int); // 3
}
receiveOnlyRet!T receiveOnly(T...)();

Receives only messages with arguments of the specified types.

Parameters:
T Variadic list of types to be received.
Returns:
The received message. If T has more than one entry, the message will be packed into a std.typecons.Tuple.
Throws:
MessageMismatch if a message of types other than T is received, OwnerTerminated when the sending thread was terminated.
Examples:
auto tid = spawn(
{
    assert(receiveOnly!int == 42);
});
send(tid, 42);
Examples:
auto tid = spawn(
{
    assert(receiveOnly!string == "text");
});
send(tid, "text");
Examples:
struct Record { string name; int age; }

auto tid = spawn(
{
    auto msg = receiveOnly!(double, Record);
    assert(msg[0] == 0.5);
    assert(msg[1].name == "Alice");
    assert(msg[1].age == 31);
});

send(tid, 0.5, Record("Alice", 31));
bool receiveTimeout(T...)(Duration duration, T ops);

Receives a message from another thread and gives up if no match arrives within a specified duration.

Receive a message from another thread, or block until duration exceeds, if no messages of the specified types are available. This function works by pattern matching a message against a set of delegates and executing the first match found.

If a delegate that accepts a std.variant.Variant is included as the last argument, it will match any message that was not matched by an earlier delegate. If more than one argument is sent, the Variant will contain a std.typecons.Tuple of all values sent.

Parameters:
Duration duration Duration, how long to wait. If duration is negative, won't wait at all.
T ops Variadic list of function pointers and delegates. Entries in this list must not occlude later entries.
Returns:
true if it received a message and false if it timed out waiting for one.
Throws:
OwnerTerminated when the sending thread was terminated.
enum OnCrowding: int;

These behaviors may be specified when a mailbox is full.

block

Wait until room is available.

throwException

Throw a MailboxFull exception.

ignore

Abort the send and return.

pure @safe void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis);

Sets a maximum mailbox size.

Sets a limit on the maximum number of user messages allowed in the mailbox. If this limit is reached, the caller attempting to add a new message will execute the behavior specified by doThis. If messages is zero, the mailbox is unbounded.

Parameters:
Tid tid The Tid of the thread for which this limit should be set.
size_t messages The maximum number of messages or zero if no limit.
OnCrowding doThis The behavior executed when a message is sent to a full mailbox.
void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis);

Sets a maximum mailbox size.

Sets a limit on the maximum number of user messages allowed in the mailbox. If this limit is reached, the caller attempting to add a new message will execute onCrowdingDoThis. If messages is zero, the mailbox is unbounded.

Parameters:
Tid tid The Tid of the thread for which this limit should be set.
size_t messages The maximum number of messages or zero if no limit.
bool function(Tid) onCrowdingDoThis The routine called when a message is sent to a full mailbox.
bool register(string name, Tid tid);

Associates name with tid.

Associates name with tid in a process-local map. When the thread represented by tid terminates, any names associated with it will be automatically unregistered.

Parameters:
string name The name to associate with tid.
Tid tid The tid register by name.
Returns:
true if the name is available and tid is not known to represent a defunct thread.
bool unregister(string name);

Removes the registered name associated with a tid.

Parameters:
string name The name to unregister.
Returns:
true if the name is registered, false if not.
Tid locate(string name);

Gets the Tid associated with name.

Parameters:
string name The name to locate within the registry.
Returns:
The associated Tid or Tid.init if name is not registered.
struct ThreadInfo;

Encapsulates all implementation-level data needed for scheduling.

When defining a Scheduler, an instance of this struct must be associated with each logical thread. It contains all implementation-level information needed by the internal API.

static nothrow @property ref auto thisInfo();

Gets a thread-local instance of ThreadInfo.

Gets a thread-local instance of ThreadInfo, which should be used as the default instance when info is requested for a thread not created by the Scheduler.

void cleanup();

Cleans up this ThreadInfo.

This must be called when a scheduled thread terminates. It tears down the messaging system for the thread and notifies interested parties of the thread's termination.

interface Scheduler;

A Scheduler controls how threading is performed by spawn.

Implementing a Scheduler allows the concurrency mechanism used by this module to be customized according to different needs. By default, a call to spawn will create a new kernel thread that executes the supplied routine and terminates when finished. But it is possible to create Schedulers that reuse threads, that multiplex Fibers (coroutines) across a single thread, or any number of other approaches. By making the choice of Scheduler a user-level option, std.concurrency may be used for far more types of application than if this behavior were predefined.

Example
import std.concurrency;
import std.stdio;

void main()
{
    scheduler = new FiberScheduler;
    scheduler.start(
    {
        writeln("the rest of main goes here");
    });
}
Some schedulers have a dispatching loop that must run if they are to work properly, so for the sake of consistency, when using a scheduler, start() must be called within main(). This yields control to the scheduler and will ensure that any spawned threads are executed in an expected manner.
abstract void start(void delegate() op);

Spawns the supplied op and starts the Scheduler.

This is intended to be called at the start of the program to yield all scheduling to the active Scheduler instance. This is necessary for schedulers that explicitly dispatch threads rather than simply relying on the operating system to do so, and so start should always be called within main() to begin normal program execution.

Parameters:
void delegate() op A wrapper for whatever the main thread would have done in the absence of a custom scheduler. It will be automatically executed via a call to spawn by the Scheduler.
abstract void spawn(void delegate() op);

Assigns a logical thread to execute the supplied op.

This routine is called by spawn. It is expected to instantiate a new logical thread and run the supplied operation. This thread must call thisInfo.cleanup() when the thread terminates if the scheduled thread is not a kernel thread--all kernel threads will have their ThreadInfo cleaned up automatically by a thread-local destructor.

Parameters:
void delegate() op The function to execute. This may be the actual function passed by the user to spawn itself, or may be a wrapper function.
abstract nothrow void yield();

Yields execution to another logical thread.

This routine is called at various points within concurrency-aware APIs to provide a scheduler a chance to yield execution when using some sort of cooperative multithreading model. If this is not appropriate, such as when each logical thread is backed by a dedicated kernel thread, this routine may be a no-op.

abstract nothrow @property ref ThreadInfo thisInfo();

Returns an appropriate ThreadInfo instance.

Returns an instance of ThreadInfo specific to the logical thread that is calling this routine or, if the calling thread was not create by this scheduler, returns ThreadInfo.thisInfo instead.

abstract nothrow Condition newCondition(Mutex m);

Creates a Condition variable analog for signaling.

Creates a new Condition variable analog which is used to check for and to signal the addition of messages to a thread's message queue. Like yield, some schedulers may need to define custom behavior so that calls to Condition.wait() yield to another thread when no new messages are available instead of blocking.

Parameters:
Mutex m The Mutex that will be associated with this condition. It will be locked prior to any operation on the condition, and so in some cases a Scheduler may need to hold this reference and unlock the mutex before yielding execution to another logical thread.
class ThreadScheduler: std.concurrency.Scheduler;

An example Scheduler using kernel threads.

This is an example Scheduler that mirrors the default scheduling behavior of creating one kernel thread per call to spawn. It is fully functional and may be instantiated and used, but is not a necessary part of the default functioning of this module.

void start(void delegate() op);

This simply runs op directly, since no real scheduling is needed by this approach.

void spawn(void delegate() op);

Creates a new kernel thread and assigns it to run the supplied op.

nothrow void yield();

This scheduler does no explicit multiplexing, so this is a no-op.

nothrow @property ref ThreadInfo thisInfo();

Returns ThreadInfo.thisInfo, since it is a thread-local instance of ThreadInfo, which is the correct behavior for this scheduler.

nothrow Condition newCondition(Mutex m);

Creates a new Condition variable. No custom behavior is needed here.

class FiberScheduler: std.concurrency.Scheduler;

An example Scheduler using Fibers.

This is an example scheduler that creates a new Fiber per call to spawn and multiplexes the execution of all fibers within the main thread.

void start(void delegate() op);

This creates a new Fiber for the supplied op and then starts the dispatcher.

nothrow void spawn(void delegate() op);

This created a new Fiber for the supplied op and adds it to the dispatch list.

nothrow void yield();

If the caller is a scheduled Fiber, this yields execution to another scheduled Fiber.

nothrow @property ref ThreadInfo thisInfo();

Returns an appropriate ThreadInfo instance.

Returns a ThreadInfo instance specific to the calling Fiber if the Fiber was created by this dispatcher, otherwise it returns ThreadInfo.thisInfo.

nothrow Condition newCondition(Mutex m);

Returns a Condition analog that yields when wait or notify is called.

Bug
For the default implementation, notifyAllwill behave like notify.
Parameters:
Mutex m A Mutex to use for locking if the condition needs to be waited on or notified from multiple Threads. If null, no Mutex will be used and it is assumed that the Condition is only waited on/notified from one Thread.
protected nothrow void create(void delegate() op);

Creates a new Fiber which calls the given delegate.

Parameters:
void delegate() op The delegate the fiber should call
class InfoFiber: core.thread.fiber.Fiber;

Fiber which embeds a ThreadInfo

Scheduler scheduler;

Sets the Scheduler behavior within the program.

This variable sets the Scheduler behavior within this program. Typically, when setting a Scheduler, scheduler.start() should be called in main. This routine will not return until program execution is complete.

nothrow void yield();

If the caller is a Fiber and is not a Generator, this function will call scheduler.yield() or Fiber.yield(), as appropriate.

class Generator(T): Fiber, IsGenerator, InputRange!T;

A Generator is a Fiber that periodically returns values of type T to the caller via yield. This is represented as an InputRange.

Examples:
auto tid = spawn({
    int i;
    while (i < 9)
        i = receiveOnly!int;

    ownerTid.send(i * 2);
});

auto r = new Generator!int({
    foreach (i; 1 .. 10)
        yield(i);
});

foreach (e; r)
    tid.send(e);

writeln(receiveOnly!int); // 18
this(void function() fn);

Initializes a generator object which is associated with a static D function. The function will be called once to prepare the range for iteration.

Parameters:
void function() fn The fiber function.
In
fn must not be null.
this(void function() fn, size_t sz);

Initializes a generator object which is associated with a static D function. The function will be called once to prepare the range for iteration.

Parameters:
void function() fn The fiber function.
size_t sz The stack size for this fiber.
In
fn must not be null.
this(void function() fn, size_t sz, size_t guardPageSize);

Initializes a generator object which is associated with a static D function. The function will be called once to prepare the range for iteration.

Parameters:
void function() fn The fiber function.
size_t sz The stack size for this fiber.
size_t guardPageSize size of the guard page to trap fiber's stack overflows. Refer to core.thread.Fiber's documentation for more details.
In
fn must not be null.
this(void delegate() dg);

Initializes a generator object which is associated with a dynamic D function. The function will be called once to prepare the range for iteration.

Parameters:
void delegate() dg The fiber function.
In
dg must not be null.
this(void delegate() dg, size_t sz);

Initializes a generator object which is associated with a dynamic D function. The function will be called once to prepare the range for iteration.

Parameters:
void delegate() dg The fiber function.
size_t sz The stack size for this fiber.
In
dg must not be null.
this(void delegate() dg, size_t sz, size_t guardPageSize);

Initializes a generator object which is associated with a dynamic D function. The function will be called once to prepare the range for iteration.

Parameters:
void delegate() dg The fiber function.
size_t sz The stack size for this fiber.
size_t guardPageSize size of the guard page to trap fiber's stack overflows. Refer to core.thread.Fiber's documentation for more details.
In
dg must not be null.
final @property bool empty();

Returns true if the generator is empty.

final void popFront();

Obtains the next value from the underlying function.

final @property T front();

Returns the most recently generated value by shallow copy.

final T moveFront();

Returns the most recently generated value without executing a copy contructor. Will not compile for element types defining a postblit, because Generator does not return by reference.

void yield(T)(ref T value);

void yield(T)(T value);

Yields a value of type T to the caller of the currently executing generator.

Parameters:
T value The value to yield.
Examples:
import std.range;

InputRange!int myIota = iota(10).inputRangeObject;

myIota.popFront();
myIota.popFront();
writeln(myIota.moveFront); // 2
writeln(myIota.front); // 2
myIota.popFront();
writeln(myIota.front); // 3

//can be assigned to std.range.interfaces.InputRange directly
myIota = new Generator!int(
{
    foreach (i; 0 .. 10) yield(i);
});

myIota.popFront();
myIota.popFront();
writeln(myIota.moveFront); // 2
writeln(myIota.front); // 2
myIota.popFront();
writeln(myIota.front); // 3

size_t[2] counter = [0, 0];
foreach (i, unused; myIota) counter[] += [1, i];

assert(myIota.empty);
writeln(counter); // [7, 21]
ref auto initOnce(alias var)(lazy typeof(var) init);

Initializes var with the lazy init value in a thread-safe manner.

The implementation guarantees that all threads simultaneously calling initOnce with the same var argument block until var is fully initialized. All side-effects of init are globally visible afterwards.

Parameters:
var The variable to initialize
typeof(var) init The lazy initializer value
Returns:
A reference to the initialized variable
Examples:
A typical use-case is to perform lazy but thread-safe initialization.
static class MySingleton
{
    static MySingleton instance()
    {
        __gshared MySingleton inst;
        return initOnce!inst(new MySingleton);
    }
}

assert(MySingleton.instance !is null);
ref auto initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex);

ref auto initOnce(alias var)(lazy typeof(var) init, Mutex mutex);

Same as above, but takes a separate mutex instead of sharing one among all initOnce instances.

This should be used to avoid dead-locks when the init expression waits for the result of another thread that might also call initOnce. Use with care.

Parameters:
var The variable to initialize
typeof(var) init The lazy initializer value
Mutex mutex A mutex to prevent race conditions
Returns:
A reference to the initialized variable
Examples:
Use a separate mutex when init blocks on another thread that might also call initOnce.
import core.sync.mutex : Mutex;

static shared bool varA, varB;
static shared Mutex m;
m = new shared Mutex;

spawn({
    // use a different mutex for varB to avoid a dead-lock
    initOnce!varB(true, m);
    ownerTid.send(true);
});
// init depends on the result of the spawned thread
initOnce!varA(receiveOnly!bool);
writeln(varA); // true
writeln(varB); // true

© 1999–2021 The D Language Foundation
Licensed under the Boost License 1.0.
https://dlang.org/phobos/std_concurrency.html