Tasks and Parallel Computing
Tasks
-
Task(func)
-
Create a
Task
(i.e. coroutine) to execute the given function (which must be callable with no arguments). The task exits when this function returns.
-
yieldto(task, arg = nothing)
-
Switch to the given task. The first time a task is switched to, the task’s function is called with no arguments. On subsequent switches,
arg
is returned from the task’s last call toyieldto
. This is a low-level call that only switches tasks, not considering states or scheduling in any way. Its use is discouraged.
-
current_task()
-
Get the currently running
Task
.
-
istaskdone(task) → Bool
-
Determine whether a task has exited.
-
istaskstarted(task) → Bool
-
Determine whether a task has started executing.
-
consume(task, values...)
-
Receive the next value passed to
produce
by the specified task. Additional arguments may be passed, to be returned from the lastproduce
call in the producer.
-
produce(value)
-
Send the given value to the last
consume
call, switching to the consumer task. If the nextconsume
call passes any values, they are returned byproduce
.
-
yield()
-
Switch to the scheduler to allow another scheduled task to run. A task that calls this function is still runnable, and will be restarted immediately if there are no other runnable tasks.
-
task_local_storage(key)
-
Look up the value of a key in the current task’s task-local storage.
-
task_local_storage(key, value)
-
Assign a value to a key in the current task’s task-local storage.
-
task_local_storage(body, key, value)
-
Call the function
body
with a modified task-local storage, in whichvalue
is assigned tokey
; the previous value ofkey
, or lack thereof, is restored afterwards. Useful for emulating dynamic scoping.
-
Condition()
-
Create an edge-triggered event source that tasks can wait for. Tasks that call
wait
on aCondition
are suspended and queued. Tasks are woken up whennotify
is later called on theCondition
. Edge triggering means that only tasks waiting at the timenotify
is called can be woken up. For level-triggered notifications, you must keep extra state to keep track of whether a notification has happened. TheChannel
type does this, and so can be used for level-triggered events.
-
notify(condition, val=nothing; all=true, error=false)
-
Wake up tasks waiting for a condition, passing them
val
. Ifall
istrue
(the default), all waiting tasks are woken, otherwise only one is. Iferror
istrue
, the passed value is raised as an exception in the woken tasks.
-
schedule(t::Task, [val]; error=false)
-
Add a task to the scheduler’s queue. This causes the task to run constantly when the system is otherwise idle, unless the task performs a blocking operation such as
wait
.If a second argument
val
is provided, it will be passed to the task (via the return value ofyieldto
) when it runs again. Iferror
istrue
, the value is raised as an exception in the woken task.
-
@schedule()
-
Wrap an expression in a
Task
and add it to the local machine’s scheduler queue.
-
@task()
-
Wrap an expression in a
Task
without executing it, and return theTask
. This only creates a task, and does not run it.
-
sleep(seconds)
-
Block the current task for a specified number of seconds. The minimum sleep time is 1 millisecond or input of
0.001
.
-
Channel{T}(sz::Int)
-
Constructs a
Channel
that can hold a maximum ofsz
objects of typeT
.put!
calls on a full channel block till an object is removed withtake!
.Other constructors:
-
Channel()
- equivalent toChannel{Any}(32)
-
Channel(sz::Int)
equivalent toChannel{Any}(sz)
-
General Parallel Computing Support
-
addprocs(np::Integer; restrict=true, kwargs...) → List of process identifiers
-
Launches workers using the in-built
LocalManager
which only launches workers on the local host. This can be used to take advantage of multiple cores.addprocs(4)
will add 4 processes on the local machine. Ifrestrict
istrue
, binding is restricted to127.0.0.1
.
-
addprocs(; kwargs...) → List of process identifiers
-
Equivalent to
addprocs(Sys.CPU_CORES; kwargs...)
Note that workers do not run a
.juliarc.jl
startup script, nor do they synchronize their global state (such as global variables, new method definitions, and loaded modules) with any of the other running processes.
-
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) → List of process identifiers
-
Add processes on remote machines via SSH. Requires
julia
to be installed in the same location on each node, or to be available via a shared file system.machines
is a vector of machine specifications. Workers are started for each specification.A machine specification is either a string
machine_spec
or a tuple -(machine_spec, count)
.machine_spec
is a string of the form[user@]host[:port] [bind_addr[:port]]
.user
defaults to current user,port
to the standard ssh port. If[bind_addr[:port]]
is specified, other workers will connect to this worker at the specifiedbind_addr
andport
.count
is the number of workers to be launched on the specified host. If specified as:auto
it will launch as many workers as the number of cores on the specific host.Keyword arguments:
-
tunnel
: iftrue
then SSH tunneling will be used to connect to the worker from the master process. Default isfalse
. -
sshflags
: specifies additional ssh options, e.g.sshflags=`-i /home/foo/bar.pem`
-
max_parallel
: specifies the maximum number of workers connected to in parallel at a host. Defaults to 10. -
dir
: specifies the working directory on the workers. Defaults to the host’s current directory (as found bypwd()
) -
exename
: name of thejulia
executable. Defaults to"$JULIA_HOME/julia"
or"$JULIA_HOME/julia-debug"
as the case may be. -
exeflags
: additional flags passed to the worker processes. -
topology
: Specifies how the workers connect to each other. Sending a message between unconnected workers results in an error.-
topology=:all_to_all
: All processes are connected to each other. This is the default. -
topology=:master_slave
: Only the driver process, i.e.pid
1 connects to the workers. The workers do not connect to each other. -
topology=:custom
: Thelaunch
method of the cluster manager specifies the connection topology via fieldsident
andconnect_idents
inWorkerConfig
. A worker with a cluster manager identityident
will connect to all workers specified inconnect_idents
.
-
Environment variables :
If the master process fails to establish a connection with a newly launched worker within 60.0 seconds, the worker treats it as a fatal situation and terminates. This timeout can be controlled via environment variable
JULIA_WORKER_TIMEOUT
. The value ofJULIA_WORKER_TIMEOUT
on the master process specifies the number of seconds a newly launched worker waits for connection establishment. -
-
addprocs(manager::ClusterManager; kwargs...) → List of process identifiers
-
Launches worker processes via the specified cluster manager.
For example Beowulf clusters are supported via a custom cluster manager implemented in the package
ClusterManagers.jl
.The number of seconds a newly launched worker waits for connection establishment from the master can be specified via variable
JULIA_WORKER_TIMEOUT
in the worker process’s environment. Relevant only when using TCP/IP as transport.
-
nprocs()
-
Get the number of available processes.
-
nworkers()
-
Get the number of available worker processes. This is one less than
nprocs()
. Equal tonprocs()
ifnprocs() == 1
.
-
procs()
-
Returns a list of all process identifiers.
-
procs(pid::Integer)
-
Returns a list of all process identifiers on the same physical node. Specifically all workers bound to the same ip-address as
pid
are returned.
-
workers()
-
Returns a list of all worker process identifiers.
-
rmprocs(pids...; waitfor=0.0)
-
Removes the specified workers. Note that only process 1 can add or remove workers - if another worker tries to call
rmprocs
, an error will be thrown. The optional argumentwaitfor
determines how long the first process will wait for the workers to shut down.
-
interrupt(pids::AbstractVector=workers())
-
Interrupt the current executing task on the specified workers. This is equivalent to pressing Ctrl-C on the local machine. If no arguments are given, all workers are interrupted.
-
interrupt(pids::Integer...)
-
Interrupt the current executing task on the specified workers. This is equivalent to pressing Ctrl-C on the local machine. If no arguments are given, all workers are interrupted.
-
myid()
-
Get the id of the current process.
-
asyncmap(f, c...) → collection
-
Transform collection
c
by applying@async f
to each element.For multiple collection arguments, apply f elementwise.
-
pmap([::AbstractWorkerPool, ]f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) → collection
-
Transform collection
c
by applyingf
to each element using available workers and tasks.For multiple collection arguments, apply f elementwise.
Note that
f
must be made available to all worker processes; see Code Availability and Loading Packages for details.If a worker pool is not specified, all available workers, i.e., the default worker pool is used.
By default,
pmap
distributes the computation over all specified workers. To use only the local process and distribute over tasks, specifydistributed=false
. This is equivalent toasyncmap
.pmap
can also use a mix of processes and tasks via thebatch_size
argument. For batch sizes greater than 1, the collection is split into multiple batches, which are distributed across workers. Each such batch is processed in parallel via tasks in each worker. The specifiedbatch_size
is an upper limit, the actual size of batches may be smaller and is calculated depending on the number of workers available and length of the collection.Any error stops pmap from processing the remainder of the collection. To override this behavior you can specify an error handling function via argument
on_error
which takes in a single argument, i.e., the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value which is then returned inline with the results to the caller.Failed computation can also be retried via
retry_on
,retry_n
,retry_max_delay
, which are passed through toretry
as argumentsretry_on
,n
andmax_delay
respectively. If batching is specified, and an entire batch fails, all items in the batch are retried.The following are equivalent:
-
pmap(f, c; distributed=false)
andasyncmap(f,c)
-
pmap(f, c; retry_n=1)
andasyncmap(retry(remote(f)),c)
-
pmap(f, c; retry_n=1, on_error=e->e)
andasyncmap(x->try retry(remote(f))(x) catch e; e end, c)
-
-
remotecall(f, id::Integer, args...; kwargs...) → Future
-
Call a function
f
asynchronously on the given arguments on the specified process. Returns aFuture
. Keyword arguments, if any, are passed through tof
.
-
Base.process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
-
Called by cluster managers using custom transports. It should be called when the custom transport implementation receives the first message from a remote worker. The custom transport must manage a logical connection to the remote worker and provide two
IO
objects, one for incoming messages and the other for messages addressed to the remote worker. Ifincoming
istrue
, the remote peer initiated the connection. Whichever of the pair initiates the connection sends the cluster cookie and its Julia version number to perform the authentication handshake.
-
RemoteException(captured)
-
Exceptions on remote computations are captured and rethrown locally. A
RemoteException
wraps the pid of the worker and a captured exception. ACapturedException
captures the remote exception and a serializable form of the call stack when the exception was raised.
-
Future(pid::Integer=myid())
-
Create a
Future
on processpid
. The defaultpid
is the current process.
-
RemoteChannel(pid::Integer=myid())
-
Make a reference to a
Channel{Any}(1)
on processpid
. The defaultpid
is the current process.
-
RemoteChannel(f::Function, pid::Integer=myid())
-
Create references to remote channels of a specific size and type.
f()
is a function that when executed onpid
must return an implementation of anAbstractChannel
.For example,
RemoteChannel(()->Channel{Int}(10), pid)
, will return a reference to a channel of typeInt
and size 10 onpid
.The default
pid
is the current process.
-
wait([x])
-
Block the current task until some event occurs, depending on the type of the argument:
-
RemoteChannel
: Wait for a value to become available on the specified remote channel. -
Future
: Wait for a value to become available for the specified future. -
Channel
: Wait for a value to be appended to the channel. -
Condition
: Wait fornotify
on a condition. -
Process
: Wait for a process or process chain to exit. Theexitcode
field of a process can be used to determine success or failure. -
Task
: Wait for aTask
to finish, returning its result value. If the task fails with an exception, the exception is propagated (re-thrown in the task that calledwait
). -
RawFD
: Wait for changes on a file descriptor (seepoll_fd
for keyword arguments and return code)
If no argument is passed, the task blocks for an undefined period. A task can only be restarted by an explicit call to
schedule
oryieldto
.Often
wait
is called within awhile
loop to ensure a waited-for condition is met before proceeding. -
-
fetch(x)
-
Waits and fetches a value from
x
depending on the type ofx
. Does not remove the item fetched:-
Future
: Wait for and get the value of a Future. The fetched value is cached locally. Further calls tofetch
on the same reference return the cached value. If the remote value is an exception, throws aRemoteException
which captures the remote exception and backtrace. -
RemoteChannel
: Wait for and get the value of a remote reference. Exceptions raised are same as for aFuture
. -
Channel
: Wait for and get the first available item from the channel.
-
-
remotecall_wait(f, id::Integer, args...; kwargs...)
-
Perform a faster
wait(remotecall(...))
in one message on theWorker
specified by worker idid
. Keyword arguments, if any, are passed through tof
.
-
remotecall_fetch(f, id::Integer, args...; kwargs...)
-
Perform
fetch(remotecall(...))
in one message. Keyword arguments, if any, are passed through tof
. Any remote exceptions are captured in aRemoteException
and thrown.
-
put!(rr::RemoteChannel, args...)
-
Store a set of values to the
RemoteChannel
. If the channel is full, blocks until space is available. Returns its first argument.
-
put!(rr::Future, v)
-
Store a value to a
Future
rr
.Future
s are write-once remote references. Aput!
on an already setFuture
throws anException
. All asynchronous remote calls returnFuture
s and set the value to the return value of the call upon completion.
-
put!(c::Channel, v)
-
Appends an item
v
to the channelc
. Blocks if the channel is full.
-
take!(rr::RemoteChannel, args...)
-
Fetch value(s) from a remote channel, removing the value(s) in the processs.
-
take!(c::Channel)
-
Removes and returns a value from a
Channel
. Blocks till data is available.
-
isready(c::Channel)
-
Determine whether a
Channel
has a value stored to it.isready
onChannel
s is non-blocking.
-
isready(rr::RemoteChannel, args...)
-
Determine whether a
RemoteChannel
has a value stored to it. Note that this function can cause race conditions, since by the time you receive its result it may no longer be true. However, it can be safely used on aFuture
since they are assigned only once.
-
isready(rr::Future)
-
Determine whether a
Future
has a value stored to it.If the argument
Future
is owned by a different node, this call will block to wait for the answer. It is recommended to wait forrr
in a separate task instead or to use a localChannel
as a proxy:c = Channel(1) @async put!(c, remotecall_fetch(long_computation, p)) isready(c) # will not block
-
close(c::Channel)
-
Closes a channel. An exception is thrown by:
-
put!
on a closed channel. -
take!
andfetch
on an empty, closed channel.
-
-
WorkerPool(workers)
-
Create a WorkerPool from a vector of worker ids.
-
CachingPool(workers::Vector{Int})
-
An implementation of an
AbstractWorkerPool
.remote
,remotecall_fetch
,pmap
and other remote calls which execute functions remotely, benefit from caching the serialized/deserialized functions on the worker nodes, especially for closures which capture large amounts of data.The remote cache is maintained for the lifetime of the returned
CachingPool
object. To clear the cache earlier, useclear!(pool)
.For global variables, only the bindings are captured in a closure, not the data.
let
blocks can be used to capture global data.For example:
const foo=rand(10^8); wp=CachingPool(workers()) let foo=foo pmap(wp, i->sum(foo)+i, 1:100); end
The above would transfer
foo
only once to each worker.
-
default_worker_pool()
-
WorkerPool containing idle
workers()
(used byremote(f)
).
-
remote([::AbstractWorkerPool, ]f) → Function
-
Returns a lambda that executes function
f
on an available worker usingremotecall_fetch
.
-
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...)
-
Call
f(args...; kwargs...)
on one of the workers inpool
. Returns aFuture
.
-
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...)
-
Call
f(args...; kwargs...)
on one of the workers inpool
. Waits for completion, returns aFuture
.
-
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...)
-
Call
f(args...; kwargs...)
on one of the workers inpool
. Waits for completion and returns the result.
-
timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1)
-
Waits till
testcb
returnstrue
or forsecs
seconds, whichever is earlier.testcb
is polled everypollint
seconds.
-
@spawn()
-
Creates a closure around an expression and runs it on an automatically-chosen process, returning a
Future
to the result.
-
@spawnat()
-
Accepts two arguments,
p
and an expression. A closure is created around the expression and run asynchronously on processp
. Returns aFuture
to the result.
-
@fetch()
-
Equivalent to
fetch(@spawn expr)
.
-
@fetchfrom()
-
Equivalent to
fetch(@spawnat p expr)
.
-
@async()
-
Like
@schedule
,@async
wraps an expression in aTask
and adds it to the local machine’s scheduler queue. Additionally it adds the task to the set of items that the nearest enclosing@sync
waits for.@async
also wraps the expression in alet x=x, y=y, ...
block to create a new scope with copies of all variables referenced in the expression.
-
@sync()
-
Wait until all dynamically-enclosed uses of
@async
,@spawn
,@spawnat
and@parallel
are complete. All exceptions thrown by enclosed async operations are collected and thrown as aCompositeException
.
-
@parallel()
-
A parallel for loop of the form :
@parallel [reducer] for var = range body end
The specified range is partitioned and locally executed across all workers. In case an optional reducer function is specified,
@parallel
performs local reductions on each worker with a final reduction on the calling process.Note that without a reducer function,
@parallel
executes asynchronously, i.e. it spawns independent tasks on all available workers and returns immediately without waiting for completion. To wait for completion, prefix the call with@sync
, like :@sync @parallel for var = range body end
-
@everywhere()
-
Execute an expression on all processes. Errors on any of the processes are collected into a
CompositeException
and thrown. For example :@everywhere bar=1
will define
bar
under moduleMain
on all processes.Unlike
@spawn
and@spawnat
,@everywhere
does not capture any local variables. Prefixing@everywhere
with@eval
allows us to broadcast local variables using interpolation :foo = 1 @eval @everywhere bar=$foo
-
clear!(pool::CachingPool) → pool
-
Removes all cached functions from all participating workers.
-
Base.remoteref_id(r::AbstractRemoteRef) → RRID
-
Future
s andRemoteChannel
s are identified by fields:where
- refers to the node where the underlying object/storage referred to by the reference actually exists.whence
- refers to the node the remote reference was created from. Note that this is different from the node where the underlying object referred to actually exists. For example callingRemoteChannel(2)
from the master process would result in awhere
value of 2 and awhence
value of 1.id
is unique across all references created from the worker specified bywhence
.Taken together,
whence
andid
uniquely identify a reference across all workers.Base.remoteref_id
is a low-level API which returns aBase.RRID
object that wrapswhence
andid
values of a remote reference.
-
Base.channel_from_id(id) → c
-
A low-level API which returns the backing
AbstractChannel
for anid
returned byBase.remoteref_id()
. The call is valid only on the node where the backing channel exists.
-
Base.worker_id_from_socket(s) → pid
-
A low-level API which given a
IO
connection or aWorker
, returns thepid
of the worker it is connected to. This is useful when writing customserialize
methods for a type, which optimizes the data written out depending on the receiving process id.
-
Returns the cluster cookie.
-
Base.cluster_cookie(cookie) → cookie
-
Sets the passed cookie as the cluster cookie, then returns it.
Shared Arrays
-
Construct a
SharedArray
of a bitstypeT
and sizedims
across the processes specified bypids
- all of which have to be on the same host.If
pids
is left unspecified, the shared array will be mapped across all processes on the current host, including the master. But,localindexes
andindexpids
will only refer to worker processes. This facilitates work distribution code to use workers for actual computation with the master process acting as a driver.If an
init
function of the typeinitfn(S::SharedArray)
is specified, it is called on all the participating workers.
-
SharedArray(filename::AbstractString, T::Type, dims::NTuple, [offset=0]; mode=nothing, init=false, pids=Int[])
-
Construct a
SharedArray
backed by the filefilename
, with element typeT
(must be abitstype
) and sizedims
, across the processes specified bypids
- all of which have to be on the same host. This file is mmapped into the host memory, with the following consequences:- The array data must be represented in binary format (e.g., an ASCII format like CSV cannot be supported)
- Any changes you make to the array values (e.g.,
A[3] = 0
) will also change the values on disk
If
pids
is left unspecified, the shared array will be mapped across all processes on the current host, including the master. But,localindexes
andindexpids
will only refer to worker processes. This facilitates work distribution code to use workers for actual computation with the master process acting as a driver.mode
must be one of"r"
,"r+"
,"w+"
, or"a+"
, and defaults to"r+"
if the file specified byfilename
already exists, or"w+"
if not. If aninit
function of the typeinitfn(S::SharedArray)
is specified, it is called on all the participating workers. You cannot specify aninit
function if the file is not writable.offset
allows you to skip the specified number of bytes at the beginning of the file.
-
procs(S::SharedArray)
-
Get the vector of processes that have mapped the shared array.
-
sdata(S::SharedArray)
-
Returns the actual
Array
object backingS
.
-
indexpids(S::SharedArray)
-
Returns the index of the current worker into the
pids
vector, i.e., the list of workers mapping the SharedArray
-
localindexes(S::SharedArray)
-
Returns a range describing the “default” indexes to be handled by the current process. This range should be interpreted in the sense of linear indexing, i.e., as a sub-range of
1:length(S)
. In multi-process contexts, returns an empty range in the parent process (or any process for whichindexpids
returns 0).It’s worth emphasizing that
localindexes
exists purely as a convenience, and you can partition work on the array among workers any way you wish. For a SharedArray, all indexes should be equally fast for each worker process.
Multi-Threading
This experimental interface supports Julia’s multi-threading capabilities. Types and function described here might (and likely will) change in the future.
-
Threads.threadid()
-
Get the ID number of the current thread of execution. The master thread has ID
1
.
-
Threads.nthreads()
-
Get the number of threads available to the Julia process. This is the inclusive upper bound on
threadid()
.
-
Threads.@threads()
-
A macro to parallelize a for-loop to run with multiple threads. This spawns
nthreads()
number of threads, splits the iteration space amongst them, and iterates in parallel. A barrier is placed at the end of the loop which waits for all the threads to finish execution, and the loop returns.
-
Threads.Atomic{T}()
-
Holds a reference to an object of type
T
, ensuring that it is only accessed atomically, i.e. in a thread-safe manner.Only certain “simple” types can be used atomically, namely the bitstypes integer and float-point types. These are
Int8
...``Int128``,UInt8
...``UInt128``, andFloat16
...``Float64``.New atomic objects can be created from a non-atomic values; if none is specified, the atomic object is initialized with zero.
Atomic objects can be accessed using the
[]
notation:x::Atomic{Int} x[] = 1 val = x[]
Atomic operations use an
atomic_
prefix, such asatomic_add!
,atomic_xchg!
, etc.
-
Threads.atomic_cas!{T}(x::Atomic{T}, cmp::T, newval::T)
-
Atomically compare-and-set
x
Atomically compares the value in
x
withcmp
. If equal, writenewval
tox
. Otherwise, leavesx
unmodified. Returns the old value inx
. By comparing the returned value tocmp
(via===
) one knows whetherx
was modified and now holds the new valuenewval
.For further details, see LLVM’s
cmpxchg
instruction.This function can be used to implement transactional semantics. Before the transaction, one records the value in
x
. After the transaction, the new value is stored only ifx
has not been modified in the mean time.
-
Threads.atomic_xchg!{T}(x::Atomic{T}, newval::T)
-
Atomically exchange the value in
x
Atomically exchanges the value in
x
withnewval
. Returns the old value.For further details, see LLVM’s
atomicrmw xchg
instruction.
-
Threads.atomic_add!{T}(x::Atomic{T}, val::T)
-
Atomically add
val
tox
Performs
x[] += val
atomically. Returns the old (!) value.For further details, see LLVM’s
atomicrmw add
instruction.
-
Threads.atomic_sub!{T}(x::Atomic{T}, val::T)
-
Atomically subtract
val
fromx
Performs
x[] -= val
atomically. Returns the old (!) value.For further details, see LLVM’s
atomicrmw sub
instruction.
-
Threads.atomic_and!{T}(x::Atomic{T}, val::T)
-
Atomically bitwise-and
x
withval
Performs
x[] &= val
atomically. Returns the old (!) value.For further details, see LLVM’s
atomicrmw and
instruction.
-
Threads.atomic_nand!{T}(x::Atomic{T}, val::T)
-
Atomically bitwise-nand (not-and)
x
withval
Performs
x[] = ~(x[] & val)
atomically. Returns the old (!) value.For further details, see LLVM’s
atomicrmw nand
instruction.
-
Threads.atomic_or!{T}(x::Atomic{T}, val::T)
-
Atomically bitwise-or
x
withval
Performs
x[] |= val
atomically. Returns the old (!) value.For further details, see LLVM’s
atomicrmw or
instruction.
-
Threads.atomic_xor!{T}(x::Atomic{T}, val::T)
-
Atomically bitwise-xor (exclusive-or)
x
withval
Performs
x[] $= val
atomically. Returns the old (!) value.For further details, see LLVM’s
atomicrmw xor
instruction.
-
Threads.atomic_max!{T}(x::Atomic{T}, val::T)
-
Atomically store the maximum of
x
andval
inx
Performs
x[] = max(x[], val)
atomically. Returns the old (!) value.For further details, see LLVM’s
atomicrmw min
instruction.
-
Threads.atomic_min!{T}(x::Atomic{T}, val::T)
-
Atomically store the minimum of
x
andval
inx
Performs
x[] = min(x[], val)
atomically. Returns the old (!) value.For further details, see LLVM’s
atomicrmw max
instruction.
-
Threads.atomic_fence()
-
Insert a sequential-consistency memory fence
Inserts a memory fence with sequentially-consistent ordering semantics. There are algorithms where this is needed, i.e. where an acquire/release ordering is insufficient.
This is likely a very expensive operation. Given that all other atomic operations in Julia already have acquire/release semantics, explicit fences should not be necessary in most cases.
For further details, see LLVM’s
fence
instruction.
ccall using a threadpool (Experimental)
-
@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)
-
The
@threadcall
macro is called in the same way asccall
but does the work in a different thread. This is useful when you want to call a blocking C function without causing the mainjulia
thread to become blocked. Concurrency is limited by size of the libuv thread pool, which defaults to 4 threads but can be increased by setting theUV_THREADPOOL_SIZE
environment variable and restarting thejulia
process.Note that the called function should never call back into Julia.
Synchronization Primitives
-
AbstractLock
-
Abstract supertype describing types that implement the thread-safe synchronization primitives:
lock
,trylock
,unlock
, andislocked
-
lock(the_lock)
-
Acquires the lock when it becomes available. If the lock is already locked by a different task/thread, it waits for it to become available.
Each
lock
must be matched by anunlock
.
-
unlock(the_lock)
-
Releases ownership of the lock.
If this is a recursive lock which has been acquired before, it just decrements an internal counter and returns immediately.
-
trylock(the_lock) → Success (Boolean)
-
Acquires the lock if it is available, returning
true
if successful. If the lock is already locked by a different task/thread, returnsfalse
.Each successful
trylock
must be matched by anunlock
.
-
islocked(the_lock) → Status (Boolean)
-
Check whether the lock is held by any task/thread. This should not be used for synchronization (see instead
trylock
).
-
ReentrantLock()
-
Creates a reentrant lock for synchronizing Tasks. The same task can acquire the lock as many times as required. Each
lock
must be matched with anunlock
.This lock is NOT threadsafe. See
Threads.Mutex
for a threadsafe lock.
-
Mutex()
-
These are standard system mutexes for locking critical sections of logic.
On Windows, this is a critical section object, on pthreads, this is a
pthread_mutex_t
.See also SpinLock for a lighter-weight lock.
-
SpinLock()
-
Creates a non-reentrant lock. Recursive use will result in a deadlock. Each
lock
must be matched with anunlock
.Test-and-test-and-set spin locks are quickest up to about 30ish contending threads. If you have more contention than that, perhaps a lock is the wrong way to synchronize.
See also RecursiveSpinLock for a version that permits recursion.
See also Mutex for a more efficient version on one core or if the lock may be held for a considerable length of time.
-
RecursiveSpinLock()
-
Creates a reentrant lock. The same thread can acquire the lock as many times as required. Each
lock
must be matched with anunlock
.See also SpinLock for a slightly faster version.
See also Mutex for a more efficient version on one core or if the lock may be held for a considerable length of time.
-
Semaphore(sem_size)
-
Creates a counting semaphore that allows at most
sem_size
acquires to be in use at any time. Each acquire must be mached with a release.This construct is NOT threadsafe.
-
acquire(s::Semaphore)
-
Wait for one of the
sem_size
permits to be available, blocking until one can be acquired.
-
release(s::Semaphore)
-
Return one permit to the pool, possibly allowing another task to acquire it and resume execution.
Cluster Manager Interface
This interface provides a mechanism to launch and manage Julia workers on different cluster environments. LocalManager, for launching additional workers on the same host and SSHManager, for launching on remote hosts via ssh are present in Base. TCP/IP sockets are used to connect and transport messages between processes. It is possible for Cluster Managers to provide a different transport.
-
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
-
Implemented by cluster managers. For every Julia worker launched by this function, it should append a
WorkerConfig
entry tolaunched
and notifylaunch_ntfy
. The function MUST exit once all workers, requested bymanager
have been launched.params
is a dictionary of all keyword argumentsaddprocs
was called with.
-
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
-
Implemented by cluster managers. It is called on the master process, during a worker’s lifetime, with appropriate
op
values:- with
:register
/:deregister
when a worker is added / removed from the Julia worker pool. - with
:interrupt
wheninterrupt(workers)
is called. TheClusterManager
should signal the appropriate worker with an interrupt signal. - with
:finalize
for cleanup purposes.
- with
-
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
-
Implemented by cluster managers. It is called on the master process, by
rmprocs
. It should cause the remote worker specified bypid
to exit.Base.kill(manager::ClusterManager.....)
executes a remoteexit()
onpid
-
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
-
Called by cluster managers implementing custom transports. It initializes a newly launched process as a worker. Command line argument
--worker
has the effect of initializing a process as a worker using TCP/IP sockets for transport.cookie
is acluster_cookie()
.
-
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) → (instrm::IO, outstrm::IO)
-
Implemented by cluster managers using custom transports. It should establish a logical connection to worker with id
pid
, specified byconfig
and return a pair ofIO
objects. Messages frompid
to current process will be read offinstrm
, while messages to be sent topid
will be written tooutstrm
. The custom transport implementation must ensure that messages are delivered and received completely and in order.Base.connect(manager::ClusterManager.....)
sets up TCP/IP socket connections in-between workers.
© 2009–2016 Jeff Bezanson, Stefan Karpinski, Viral B. Shah, and other contributors
Licensed under the MIT License.
https://docs.julialang.org/en/release-0.5/stdlib/parallel/