Registry
A local, decentralized and scalable key-value process storage.
It allows developers to lookup one or more processes with a given key. If the registry has :unique
keys, a key points to 0 or 1 processes. If the registry allows :duplicate
keys, a single key may point to any number of processes. In both cases, different keys could identify the same process.
Each entry in the registry is associated to the process that has registered the key. If the process crashes, the keys associated to that process are automatically removed. All key comparisons in the registry are done using the match operation (===
).
The registry can be used for different purposes, such as name lookups (using the :via
option), storing properties, custom dispatching rules, or a pubsub implementation. We explore some of those use cases below.
The registry may also be transparently partitioned, which provides more scalable behaviour for running registries on highly concurrent environments with thousands or millions of entries.
Using in :via
Once the registry is started with a given name (using Registry.start_link/2
), it can be used to register and access named processes using the {:via, Registry, {registry, key}}
tuple:
{:ok, _} = Registry.start_link(:unique, Registry.ViaTest) name = {:via, Registry, {Registry.ViaTest, "agent"}} {:ok, _} = Agent.start_link(fn -> 0 end, name: name) Agent.get(name, & &1) #=> 0 Agent.update(name, & &1 + 1) Agent.get(name, & &1) #=> 1
Typically the registry is started as part of a supervision tree though:
supervisor(Registry, [:unique, Registry.ViaTest])
Only registries with unique keys can be used in :via
. If the name is already taken, the case-specific start_link
function (Agent.start_link/2
in the example above) will return {:error, {:already_started, current_pid}}
.
Using as a dispatcher
Registry
has a dispatch mechanism that allows developers to implement custom dispatch logic triggered from the caller. For example, let’s say we have a duplicate registry started as so:
{:ok, _} = Registry.start_link(:duplicate, Registry.DispatcherTest)
By calling register/3
, different processes can register under a given key and associate any value under that key. In this case, let’s register the current process under the key "hello"
and attach the {IO, :inspect}
tuple to it:
{:ok, _} = Registry.register(Registry.DispatcherTest, "hello", {IO, :inspect})
Now, an entity interested in dispatching events for a given key may call dispatch/3
passing in the key and a callback. This callback will be invoked with a list of all the values registered under the requested key, alongside the pid of the process that registered each value, in the form of {pid,
value}
tuples. In our example, value
will be the {module, function}
tuple in the code above:
Registry.dispatch(Registry.DispatcherTest, "hello", fn entries -> for {pid, {module, function}} <- entries, do: apply(module, function, [pid]) end) # Prints #PID<...> where the pid is for the process that called register/3 above #=> :ok
Dispatching happens in the process that calls dispatch/3
either serially or concurrently in case of multiple partitions (via spawned tasks). The registered processes are not involved in dispatching unless involving them is done explicitly (for example, by sending them a message in the callback).
Furthermore, if there is a failure when dispatching, due to a bad registration, dispatching will always fail and the registered process will not be notified. Therefore let’s make sure we at least wrap and report those errors:
require Logger Registry.dispatch(Registry.DispatcherTest, "hello", fn entries -> for {pid, {module, function}} <- entries do try do apply(module, function, [pid]) catch kind, reason -> formatted = Exception.format(kind, reason, System.stacktrace) Logger.error "Registry.dispatch/3 failed with #{formatted}" end end end) # Prints #PID<...> #=> :ok
You could also replace the whole apply
system by explicitly sending messages. That’s the example we will see next.
Using as a PubSub
Registries can also be used to implement a local, non-distributed, scalable PubSub by relying on the dispatch/3
function, similarly to the previous section: in this case, however, we will send messages to each associated process, instead of invoking a given module-function.
In this example, we will also set the number of partitions to the number of schedulers online, which will make the registry more performant on highly concurrent environments as each partition will spawn a new process, allowing dispatching to happen in parallel:
{:ok, _} = Registry.start_link(:duplicate, Registry.PubSubTest, partitions: System.schedulers_online) {:ok, _} = Registry.register(Registry.PubSubTest, "hello", []) Registry.dispatch(Registry.PubSubTest, "hello", fn entries -> for {pid, _} <- entries, do: send(pid, {:broadcast, "world"}) end) #=> :ok
The example above broadcasted the message {:broadcast, "world"}
to all processes registered under the “topic” (or “key” as we called it until now) "hello"
.
The third argument given to register/3
is a value associated to the current process. While in the previous section we used it when dispatching, in this particular example we are not interested in it, so we have set it to an empty list. You could store a more meaningful value if necessary.
Registrations
Looking up, dispatching and registering are efficient and immediate at the cost of delayed unsubscription. For example, if a process crashes, its keys are automatically removed from the registry but the change may not propagate immediately. This means certain operations may return processes that are already dead. When such may happen, it will be explicitly stated in the function documentation.
However, keep in mind those cases are typically not an issue. After all, a process referenced by a pid may crash at any time, including between getting the value from the registry and sending it a message. Many parts of the standard library are designed to cope with that, such as Process.monitor/1
which will deliver the :DOWN
message immediately if the monitored process is already dead and Kernel.send/2
which acts as a no-op for dead processes.
ETS
Note that the registry uses one ETS table plus two ETS tables per partition.
Summary
Types
- key()
-
The type of keys allowed on registration
- kind()
-
The type of the registry
- meta_key()
-
The type of registry metadata keys
- meta_value()
-
The type of registry metadata values
- registry()
-
The registry identifier
- value()
-
The type of values allowed on registration
Functions
- dispatch(registry, key, mfa_or_fun)
-
Invokes the callback with all entries under
key
in each partition for the givenregistry
- keys(registry, pid)
-
Returns the known keys for the given
pid
inregistry
in no particular order - lookup(registry, key)
-
Finds the
{pid, value}
pair for the givenkey
inregistry
in no particular order - match(registry, key, pattern)
-
Returns
{pid, value}
pairs under the givenkey
inregistry
that matchpattern
- meta(registry, key)
-
Reads registry metadata given on
start_link/3
- put_meta(registry, key, value)
-
Stores registry metadata
- register(registry, key, value)
-
Registers the current process under the given
key
inregistry
- start_link(kind, registry, options \\ [])
-
Starts the registry as a supervisor process
- unregister(registry, key)
-
Unregisters all entries for the given
key
associated to the current process inregistry
- update_value(registry, key, callback)
-
Updates the value for
key
for the current process in the uniqueregistry
Types
key()
key() :: term()
The type of keys allowed on registration
kind()
kind() :: :unique | :duplicate
The type of the registry
meta_key()
meta_key() :: atom() | tuple()
The type of registry metadata keys
meta_value()
meta_value() :: term()
The type of registry metadata values
registry()
registry() :: atom()
The registry identifier
value()
value() :: term()
The type of values allowed on registration
Functions
dispatch(registry, key, mfa_or_fun)
dispatch(registry(), key(), (entries :: [{pid(), value()}] -> term())) :: :ok
Invokes the callback with all entries under key
in each partition for the given registry
.
The list of entries
is a non-empty list of two-element tuples where the first element is the pid and the second element is the value associated to the pid. If there are no entries for the given key, the callback is never invoked.
If the registry is not partitioned, the callback is invoked in the process that calls dispatch/3
. If the registry is partitioned, the callback is invoked concurrently per partition by starting a task linked to the caller. The callback, however, is only invoked if there are entries for that partition.
See the module documentation for examples of using the dispatch/3
function for building custom dispatching or a pubsub system.
keys(registry, pid)
keys(registry(), pid()) :: [key()]
Returns the known keys for the given pid
in registry
in no particular order.
If the registry is unique, the keys are unique. Otherwise they may contain duplicates if the process was registered under the same key multiple times. The list will be empty if the process is dead or it has no keys in this registry.
Examples
Registering under a unique registry does not allow multiple entries:
iex> Registry.start_link(:unique, Registry.UniqueKeysTest) iex> Registry.keys(Registry.UniqueKeysTest, self()) [] iex> {:ok, _} = Registry.register(Registry.UniqueKeysTest, "hello", :world) iex> Registry.register(Registry.UniqueKeysTest, "hello", :later) # registry is :unique {:error, {:already_registered, self()}} iex> Registry.keys(Registry.UniqueKeysTest, self()) ["hello"]
Such is possible for duplicate registries though:
iex> Registry.start_link(:duplicate, Registry.DuplicateKeysTest) iex> Registry.keys(Registry.DuplicateKeysTest, self()) [] iex> {:ok, _} = Registry.register(Registry.DuplicateKeysTest, "hello", :world) iex> {:ok, _} = Registry.register(Registry.DuplicateKeysTest, "hello", :world) iex> Registry.keys(Registry.DuplicateKeysTest, self()) ["hello", "hello"]
lookup(registry, key)
lookup(registry(), key()) :: [{pid(), value()}]
Finds the {pid, value}
pair for the given key
in registry
in no particular order.
An empty list if there is no match.
For unique registries, a single partition lookup is necessary. For duplicate registries, all partitions must be looked up.
Examples
In the example below we register the current process and look it up both from itself and other processes:
iex> Registry.start_link(:unique, Registry.UniqueLookupTest) iex> Registry.lookup(Registry.UniqueLookupTest, "hello") [] iex> {:ok, _} = Registry.register(Registry.UniqueLookupTest, "hello", :world) iex> Registry.lookup(Registry.UniqueLookupTest, "hello") [{self(), :world}] iex> Task.async(fn -> Registry.lookup(Registry.UniqueLookupTest, "hello") end) |> Task.await [{self(), :world}]
The same applies to duplicate registries:
iex> Registry.start_link(:duplicate, Registry.DuplicateLookupTest) iex> Registry.lookup(Registry.DuplicateLookupTest, "hello") [] iex> {:ok, _} = Registry.register(Registry.DuplicateLookupTest, "hello", :world) iex> Registry.lookup(Registry.DuplicateLookupTest, "hello") [{self(), :world}] iex> {:ok, _} = Registry.register(Registry.DuplicateLookupTest, "hello", :another) iex> Enum.sort(Registry.lookup(Registry.DuplicateLookupTest, "hello")) [{self(), :another}, {self(), :world}]
match(registry, key, pattern)
match(registry(), key(), match_pattern :: atom() | tuple()) :: [{pid(), term()}]
Returns {pid, value}
pairs under the given key
in registry
that match pattern
.
Pattern must be an atom or a tuple that will match the structure of the value stored in the registry. The atom :_
can be used to ignore a given value or tuple element, while :”$1” can be used to temporarily assign part of pattern to a variable for a subsequent comparison.
An empty list will be returned if there is no match.
For unique registries, a single partition lookup is necessary. For duplicate registries, all partitions must be looked up.
Examples
In the example below we register the current process under the same key in a duplicate registry but with different values:
iex> Registry.start_link(:duplicate, Registry.MatchTest) iex> {:ok, _} = Registry.register(Registry.MatchTest, "hello", {1, :atom, 1}) iex> {:ok, _} = Registry.register(Registry.MatchTest, "hello", {2, :atom, 2}) iex> Registry.match(Registry.MatchTest, "hello", {1, :_, :_}) [{self(), {1, :atom, 1}}] iex> Registry.match(Registry.MatchTest, "hello", {2, :_, :_}) [{self(), {2, :atom, 2}}] iex> Registry.match(Registry.MatchTest, "hello", {:_, :atom, :_}) |> Enum.sort() [{self(), {1, :atom, 1}}, {self(), {2, :atom, 2}}] iex> Registry.match(Registry.MatchTest, "hello", {:"$1", :_, :"$1"}) |> Enum.sort() [{self(), {1, :atom, 1}}, {self(), {2, :atom, 2}}]
meta(registry, key)
meta(registry(), meta_key()) :: {:ok, meta_value()} | :error
Reads registry metadata given on start_link/3
.
Atoms and tuples are allowed as keys.
Examples
iex> Registry.start_link(:unique, Registry.MetaTest, meta: [custom_key: "custom_value"]) iex> Registry.meta(Registry.MetaTest, :custom_key) {:ok, "custom_value"} iex> Registry.meta(Registry.MetaTest, :unknown_key) :error
put_meta(registry, key, value)
put_meta(registry(), meta_key(), meta_value()) :: :ok
Stores registry metadata.
Atoms and tuples are allowed as keys.
Examples
iex> Registry.start_link(:unique, Registry.PutMetaTest) iex> Registry.put_meta(Registry.PutMetaTest, :custom_key, "custom_value") :ok iex> Registry.meta(Registry.PutMetaTest, :custom_key) {:ok, "custom_value"} iex> Registry.put_meta(Registry.PutMetaTest, {:tuple, :key}, "tuple_value") :ok iex> Registry.meta(Registry.PutMetaTest, {:tuple, :key}) {:ok, "tuple_value"}
register(registry, key, value)
register(registry(), key(), value()) :: {:ok, pid()} | {:error, {:already_registered, pid()}}
Registers the current process under the given key
in registry
.
A value to be associated with this registration must also be given. This value will be retrieved whenever dispatching or doing a key lookup.
This function returns {:ok, owner}
or {:error, reason}
. The owner
is the pid is the registry partition responsible for the pid. The owner is automatically linked to the caller.
If the registry has unique keys, it will return {:ok, owner}
unless the key is already associated to a pid, in which case it returns {:error, {:already_registered, pid}}
.
If the registry has duplicate keys, multiple registrations from the current process under the same key are allowed.
Examples
Registering under a unique registry does not allow multiple entries:
iex> Registry.start_link(:unique, Registry.UniqueRegisterTest) iex> {:ok, _} = Registry.register(Registry.UniqueRegisterTest, "hello", :world) iex> Registry.register(Registry.UniqueRegisterTest, "hello", :later) {:error, {:already_registered, self()}} iex> Registry.keys(Registry.UniqueRegisterTest, self()) ["hello"]
Such is possible for duplicate registries though:
iex> Registry.start_link(:duplicate, Registry.DuplicateRegisterTest) iex> {:ok, _} = Registry.register(Registry.DuplicateRegisterTest, "hello", :world) iex> {:ok, _} = Registry.register(Registry.DuplicateRegisterTest, "hello", :world) iex> Registry.keys(Registry.DuplicateRegisterTest, self()) ["hello", "hello"]
start_link(kind, registry, options \\ [])
start_link(kind(), registry(), options) :: {:ok, pid()} | {:error, term()} when options: [partitions: pos_integer(), listeners: [atom()], meta: [{meta_key(), meta_value()}]]
Starts the registry as a supervisor process.
Manually it can be started as:
Registry.start_link(:unique, MyApp.Registry)
In your supervisor tree, you would write:
supervisor(Registry, [:unique, MyApp.Registry])
For intensive workloads, the registry may also be partitioned (by specifying the :partitions
option). If partitioning is required then a good default is to set the number of partitions to the number of schedulers available:
Registry.start_link(:unique, MyApp.Registry, partitions: System.schedulers_online())
or:
supervisor(Registry, [:unique, MyApp.Registry, [partitions: System.schedulers_online()]])
Options
The registry supports the following options:
-
:partitions
- the number of partitions in the registry. Defaults to1
. -
:listeners
- a list of named processes which are notified of:register
and:unregister
events. The registered process must be monitored by the listener if the listener wants to be notified if the registered process crashes. -
:meta
- a keyword list of metadata to be attached to the registry.
unregister(registry, key)
unregister(registry(), key()) :: :ok
Unregisters all entries for the given key
associated to the current process in registry
.
Always returns :ok
and automatically unlinks the current process from the owner if there are no more keys associated to the current process. See also register/3
to read more about the “owner”.
Examples
It unregister all entries for key
for unique registries:
iex> Registry.start_link(:unique, Registry.UniqueUnregisterTest) iex> Registry.register(Registry.UniqueUnregisterTest, "hello", :world) iex> Registry.keys(Registry.UniqueUnregisterTest, self()) ["hello"] iex> Registry.unregister(Registry.UniqueUnregisterTest, "hello") :ok iex> Registry.keys(Registry.UniqueUnregisterTest, self()) []
As well as duplicate registries:
iex> Registry.start_link(:duplicate, Registry.DuplicateUnregisterTest) iex> Registry.register(Registry.DuplicateUnregisterTest, "hello", :world) iex> Registry.register(Registry.DuplicateUnregisterTest, "hello", :world) iex> Registry.keys(Registry.DuplicateUnregisterTest, self()) ["hello", "hello"] iex> Registry.unregister(Registry.DuplicateUnregisterTest, "hello") :ok iex> Registry.keys(Registry.DuplicateUnregisterTest, self()) []
update_value(registry, key, callback)
update_value(registry(), key(), (value() -> value())) :: {new_value :: term(), old_value :: term()} | :error
Updates the value for key
for the current process in the unique registry
.
Returns a {new_value, old_value}
tuple or :error
if there is no such key assigned to the current process.
If a non-unique registry is given, an error is raised.
Examples
iex> Registry.start_link(:unique, Registry.UpdateTest) iex> {:ok, _} = Registry.register(Registry.UpdateTest, "hello", 1) iex> Registry.lookup(Registry.UpdateTest, "hello") [{self(), 1}] iex> Registry.update_value(Registry.UpdateTest, "hello", & &1 + 1) {2, 1} iex> Registry.lookup(Registry.UpdateTest, "hello") [{self(), 2}]
© 2012 Plataformatec
Licensed under the Apache License, Version 2.0.
https://hexdocs.pm/elixir/1.4.5/Registry.html