GenEvent behaviour

A behaviour module for implementing event handling functionality.

The event handling model consists of a generic event manager process with an arbitrary number of event handlers which are added and deleted dynamically.

An event manager implemented using this module will have a standard set of interface functions and include functionality for tracing and error reporting. It will also fit into a supervision tree.

Example

There are many use cases for event handlers. For example, a logging system can be built using event handlers where each log message is an event and different event handlers can be attached to handle the log messages. One handler may print error messages on the terminal, another can write it to a file, while a third one can keep the messages in memory (like a buffer) until they are read.

As an example, let’s have a GenEvent that accumulates messages until they are collected by an explicit call.

# Define an Event Handler
defmodule LoggerHandler do
  use GenEvent

  # Callbacks

  def handle_event({:log, x}, messages) do
    {:ok, [x | messages]}
  end

  def handle_call(:messages, messages) do
    {:ok, Enum.reverse(messages), []}
  end
end

# Start a new event manager.
{:ok, pid} = GenEvent.start_link([])

# Attach an event handler to the event manager.
GenEvent.add_handler(pid, LoggerHandler, [])
#=> :ok

# Send some events to the event manager.
GenEvent.notify(pid, {:log, 1})
#=> :ok

GenEvent.notify(pid, {:log, 2})
#=> :ok

# Call functions on specific handlers in the manager.
GenEvent.call(pid, LoggerHandler, :messages)
#=> [1, 2]

GenEvent.call(pid, LoggerHandler, :messages)
#=> []

We start a new event manager by calling GenEvent.start_link/1. Notifications can be sent to the event manager which will then invoke handle_event/2 for each registered handler.

We can add new handlers with add_handler/3 and add_mon_handler/3. Calls can also be made to specific handlers by using call/3.

Callbacks

There are 6 callbacks required to be implemented in a GenEvent. By adding use GenEvent to your module, Elixir will automatically define all 6 callbacks for you, leaving it up to you to implement the ones you want to customize.

Name Registration

A GenEvent is bound to the same name registration rules as a GenServer. Read more about it in the GenServer docs.

Modes

GenEvent supports three different notifications.

On GenEvent.ack_notify/2, the manager acknowledges each event, providing backpressure, but processing of the message happens asynchronously.

On GenEvent.sync_notify/2, the manager acknowledges an event just after it is processed by all event handlers.

On GenEvent.notify/2, all events are processed asynchronously and there is no ack (which means there is no backpressure).

Streaming

GenEvent messages can be streamed with the help of stream/2. You will need to start another process to consume the stream:

Task.start_link fn ->
  stream = GenEvent.stream(pid)

  # Discard the next 3 events
  _ = Enum.take(stream, 3)

  # Print all remaining events
  for event <- stream do
    IO.inspect event
  end
end

Now call GenEvent.notify/2 multiple times. You will see the first three events will be skipped while the rest will be continuously printed.

Learn more and compatibility

If you wish to find out more about GenEvent, the documentation and links in Erlang can provide extra insight.

Keep in mind though Elixir and Erlang gen events are not 100% compatible. The :gen_event.add_sup_handler/3 is not supported by Elixir’s GenEvent, which in turn supports GenEvent.add_mon_handler/3.

The benefits of the monitoring approach are described in the “Don’t drink too much kool aid” section of the “Learn you some Erlang” link above. Due to those changes, Elixir’s GenEvent does not trap exits by default.

Furthermore, Elixir also normalizes the {:error, _} tuples returned by many functions, in order to be more consistent with themselves and the GenServer module.

Summary

Types

handler()

Supported values for new handlers

manager()

The event manager reference

name()

The GenEvent manager name

on_start()

Return values of start* functions

options()

Options used by the start* functions

Functions

ack_notify(manager, event)

Sends an ack event notification to the event manager

add_handler(manager, handler, args)

Adds a new event handler to the event manager

add_mon_handler(manager, handler, args)

Adds a monitored event handler to the event manager

call(manager, handler, request, timeout \\ 5000)

Makes a synchronous call to the event handler installed in manager

notify(manager, event)

Sends an event notification to the event manager

remove_handler(manager, handler, args)

Removes an event handler from the event manager

start(options \\ [])

Starts an event manager process without links (outside of a supervision tree)

start_link(options \\ [])

Starts an event manager linked to the current process

stop(manager, reason \\ :normal, timeout \\ :infinity)

Stops the manager with the given reason

stream(manager, options \\ [])

Returns a stream that consumes events from the manager

swap_handler(manager, handler1, args1, handler2, args2)

Replaces an old event handler with a new one in the event manager

swap_mon_handler(manager, handler1, args1, handler2, args2)

Replaces an old event handler with a new monitored one in the event manager

sync_notify(manager, event)

Sends a sync event notification to the event manager

which_handlers(manager)

Returns a list of all event handlers installed in the manager

Callbacks

code_change(old_vsn, state, extra)

Invoked to change the state of the handler when a different version of the handler’s module is loaded (hot code swapping) and the state’s term structure should be changed

handle_call(request, state)

Invoked to handle synchronous call/4 messages to a specific handler

handle_event(event, state)

Invoked to handle notify/2, ack_notify/2 or sync_notify/2 messages

handle_info(msg, state)

Invoked to handle all other messages. All handlers are run in the GenEvent process so messages intended for other handlers should be ignored with a catch all clause

init(args)

Invoked when the handler is added to the GenEvent process. add_handler/3 (and add_mon_handler/3) will block until it returns

terminate(reason, state)

Invoked when the server is about to exit. It should do any cleanup required

Types

handler()

handler() :: atom() | {atom(), term()}

Supported values for new handlers

manager()

manager() :: pid() | name() | {atom(), node()}

The event manager reference

name()

name() :: atom() | {:global, term()} | {:via, module(), term()}

The GenEvent manager name

on_start()

on_start() :: {:ok, pid()} | {:error, {:already_started, pid()}}

Return values of start* functions

options()

options() :: [{:name, name()}]

Options used by the start* functions

Functions

ack_notify(manager, event)

ack_notify(manager(), term()) :: :ok

Sends an ack event notification to the event manager.

In other words, this function only returns :ok as soon as the event manager starts processing this event, but it does not wait for event handlers to process the sent event.

See notify/2 for more info. Note this function is specific to Elixir’s GenEvent and does not work with Erlang ones.

add_handler(manager, handler, args)

add_handler(manager(), handler(), term()) :: :ok | {:error, term()}

Adds a new event handler to the event manager.

The event manager will call the init/1 callback with args to initiate the event handler and its internal state.

If init/1 returns a correct value indicating successful completion, the event manager adds the event handler and this function returns :ok. If the callback fails with reason or returns {:error, reason}, the event handler is ignored and this function returns {:error, reason}.

If the given handler was previously installed at the manager, this function returns {:error, :already_present}.

For installing multiple instances of the same handler, {Module, id} instead of Module must be used. The handler could be then referenced with {Module, id} instead of just Module.

add_mon_handler(manager, handler, args)

add_mon_handler(manager(), handler(), term()) ::
  :ok |
  {:error, term()}

Adds a monitored event handler to the event manager.

Expects the same input and returns the same values as add_handler/3.

Monitored handlers

A monitored handler implies the calling process will now be monitored by the GenEvent manager.

If the calling process later terminates with reason, the event manager will delete the event handler by calling the terminate/2 callback with {:stop, reason} as argument. If the event handler later is deleted, the event manager sends a message {:gen_event_EXIT, handler, reason} to the calling process. Reason is one of the following:

  • :normal - if the event handler has been removed due to a call to remove_handler/3, or :remove_handler has been returned by a callback function

  • :shutdown - if the event handler has been removed because the event manager is terminating

  • {:swapped, new_handler, pid} - if the process PID has replaced the event handler by another

  • term - if the event handler is removed due to an error. Which term depends on the error

Keep in mind that the {:gen_event_EXIT, handler, reason} message is not guaranteed to be delivered in case the manager crashes. If you want to guarantee the message is delivered, you have two options:

  • monitor the event manager
  • link to the event manager and then set Process.flag(:trap_exit, true) in your handler callback

Finally, this functionality only works with GenEvent started via this module (it is not backwards compatible with Erlang’s :gen_event).

call(manager, handler, request, timeout \\ 5000)

call(manager(), handler(), term(), timeout()) ::
  term() |
  {:error, term()}

Makes a synchronous call to the event handler installed in manager.

The given request is sent and the caller waits until a reply arrives or a timeout occurs. The event manager will call handle_call/2 to handle the request.

The return value reply is defined in the return value of handle_call/2. If the specified event handler is not installed, the function returns {:error, :not_found}.

notify(manager, event)

notify(manager(), term()) :: :ok

Sends an event notification to the event manager.

The event manager will call handle_event/2 for each installed event handler.

notify is asynchronous and will return immediately after the notification is sent. notify will not fail even if the specified event manager does not exist, unless it is specified as an atom.

remove_handler(manager, handler, args)

remove_handler(manager(), handler(), term()) ::
  term() |
  {:error, term()}

Removes an event handler from the event manager.

The event manager will call terminate/2 to terminate the event handler and return the callback value. If the specified event handler is not installed, the function returns {:error, :not_found}.

start(options \\ [])

start(options()) :: on_start()

Starts an event manager process without links (outside of a supervision tree).

See start_link/1 for more information.

start_link(options \\ [])

start_link(options()) :: on_start()

Starts an event manager linked to the current process.

This is often used to start the GenEvent as part of a supervision tree.

It accepts the :name option which is described under the Name Registration section in the GenServer module docs.

If the event manager is successfully created and initialized, the function returns {:ok, pid}, where pid is the PID of the server. If a process with the specified server name already exists, the function returns {:error, {:already_started, pid}} with the PID of that process.

Note that a GenEvent started with start_link/1 is linked to the parent process and will exit not only on crashes but also if the parent process exits with :normal reason.

stop(manager, reason \\ :normal, timeout \\ :infinity)

stop(manager(), reason :: term(), timeout()) :: :ok

Stops the manager with the given reason.

Before terminating, the event manager will call terminate(:stop, ...) for each installed event handler. It returns :ok if the manager terminates with the given reason, if it terminates with another reason, the call will exit.

This function keeps OTP semantics regarding error reporting. If the reason is any other than :normal, :shutdown or {:shutdown, _}, an error report will be logged.

stream(manager, options \\ [])

stream(manager(), Keyword.t()) :: GenEvent.Stream.t()

Returns a stream that consumes events from the manager.

The stream is a GenEvent struct that implements the Enumerable protocol. Consumption of events only begins when enumeration starts.

Note streaming is specific to Elixir’s GenEvent and does not work with Erlang ones.

Options

  • :timeout - raises if no event arrives in X milliseconds (defaults to :infinity)

swap_handler(manager, handler1, args1, handler2, args2)

swap_handler(manager(), handler(), term(), handler(), term()) ::
  :ok |
  {:error, term()}

Replaces an old event handler with a new one in the event manager.

First, the old event handler is deleted by calling terminate/2 with the given args1 and collects the return value. Then the new event handler is added and initiated by calling init({args2, term}), where term is the return value of calling terminate/2 in the old handler. This makes it possible to transfer information from one handler to another.

The new handler will be added even if the specified old event handler is not installed or if the handler fails to terminate with a given reason in which case state = {:error, term}.

If init/1 in the second handler returns a correct value, this function returns :ok.

swap_mon_handler(manager, handler1, args1, handler2, args2)

swap_mon_handler(manager(), handler(), term(), handler(), term()) ::
  :ok |
  {:error, term()}

Replaces an old event handler with a new monitored one in the event manager.

Read the docs for add_mon_handler/3 and swap_handler/5 for more information.

sync_notify(manager, event)

sync_notify(manager(), term()) :: :ok

Sends a sync event notification to the event manager.

In other words, this function only returns :ok after the event manager invokes the handle_event/2 callback on each installed event handler.

See notify/2 for more info.

which_handlers(manager)

which_handlers(manager()) :: [handler()]

Returns a list of all event handlers installed in the manager.

Callbacks

code_change(old_vsn, state, extra)

code_change(old_vsn, state :: term(), extra :: term()) :: {:ok, new_state :: term()} when old_vsn: term() | {:down, term()}

Invoked to change the state of the handler when a different version of the handler’s module is loaded (hot code swapping) and the state’s term structure should be changed.

old_vsn is the previous version of the module (defined by the @vsn attribute) when upgrading. When downgrading the previous version is wrapped in a 2-tuple with first element :down. state is the current state of the handler and extra is any extra data required to change the state.

Returning {:ok, new_state} changes the state to new_state and the code change is successful.

If code_change/3 raises, the code change fails and the handler will continue with its previous state. Therefore this callback does not usually contain side effects.

handle_call(request, state)

handle_call(request :: term(), state :: term()) ::
  {:ok, reply, new_state} |
  {:ok, reply, new_state, :hibernate} |
  {:remove_handler, reply} when reply: term(), new_state: term()

Invoked to handle synchronous call/4 messages to a specific handler.

request is the request message sent by a call/4 and state is the current state of the handler.

Returning {:ok, reply, new_state} sends reply as a response to the call and sets the handler’s state to new_state.

Returning {:ok, reply, new_state, :hibernate} is similar to {:ok, reply, new_state} except the process is hibernated. See handle_event/2 for more information on hibernation.

Returning {:remove_handler, reply} sends reply as a response to the call, removes the handler from the GenEvent loop and calls terminate/2 with reason :remove_handler and state state.

handle_event(event, state)

handle_event(event :: term(), state :: term()) ::
  {:ok, new_state} |
  {:ok, new_state, :hibernate} |
  :remove_handler when new_state: term()

Invoked to handle notify/2, ack_notify/2 or sync_notify/2 messages.

event is the event message and state is the current state of the handler.

Returning {:ok, new_state} sets the handler’s state to new_state and the GenEvent loop continues.

Returning {:ok, new_state, :hibernate} is similar to {:ok, new_state} except the process is hibernated once all handlers have handled the events. The GenEvent process will continue the loop once a message is its message queue. If a message is already in the message queue this will be immediately. Hibernating a GenEvent causes garbage collection and leaves a continuous heap that minimises the memory used by the process.

Hibernating should not be used aggressively as too much time could be spent garbage collecting. Normally it should only be used when a message is not expected soon and minimising the memory of the process is shown to be beneficial.

Returning :remove_handler removes the handler from the GenEvent loop and calls terminate/2 with reason :remove_handler and state state.

handle_info(msg, state)

handle_info(msg :: term(), state :: term()) ::
  {:ok, new_state} |
  {:ok, new_state, :hibernate} |
  :remove_handler when new_state: term()

Invoked to handle all other messages. All handlers are run in the GenEvent process so messages intended for other handlers should be ignored with a catch all clause.

msg is the message and state is the current state of the handler.

Return values are the same as handle_event/2.

init(args)

init(args :: term()) ::
  {:ok, state} |
  {:ok, state, :hibernate} |
  {:error, reason :: any()} when state: any()

Invoked when the handler is added to the GenEvent process. add_handler/3 (and add_mon_handler/3) will block until it returns.

args is the argument term (third argument) passed to add_handler/3.

Returning {:ok, state} will cause add_handler/3 to return :ok and the handler to become part of the GenEvent loop with state state.

Returning {:ok, state, :hibernate} is similar to {:ok, state} except the GenEvent process is hibernated before continuing its loop. See handle_event/2 for more information on hibernation.

Returning {:error, reason} will cause add_handler/3 to return {:error, reason} and the handler is not added to GenEvent loop.

terminate(reason, state)

terminate(reason, state :: term()) :: term() when reason: :stop | {:stop, term()} | :remove_handler | {:error, term()} | term()

Invoked when the server is about to exit. It should do any cleanup required.

reason is removal reason and state is the current state of the handler. The return value is returned to GenEvent.remove_handler/3 or ignored if removing for another reason.

reason is one of:

  • :stop - manager is terminating
  • {:stop, term} - monitored process terminated (for monitored handlers)
  • :remove_handler - handler is being removed
  • {:error, term} - handler crashed or returned a bad value and an error is logged
  • term - any term passed to functions like GenEvent.remove_handler/3

If part of a supervision tree, a GenEvent’s Supervisor will send an exit signal when shutting it down. The exit signal is based on the shutdown strategy in the child’s specification. If it is :brutal_kill the GenEvent is killed and so terminate/2 is not called for its handlers. However if it is a timeout the Supervisor will send the exit signal :shutdown and the GenEvent will have the duration of the timeout to call terminate/2 on all of its handlers - if the process is still alive after the timeout it is killed.

If the GenEvent receives an exit signal (that is not :normal) from any process when it is not trapping exits it will exit abruptly with the same reason and so not call the handlers’ terminate/2. Note that a process does NOT trap exits by default and an exit signal is sent when a linked process exits or its node is disconnected. Therefore it is not guaranteed that terminate/2 is called when a GenEvent exits.

Care should be taken to cleanup because the GenEvent can continue to loop after removing the handler. This is different to most other OTP behaviours. For example if the handler controls a port (e.g. :gen_tcp.socket) or File.io_device/0, it will be need to be closed in terminate/2 as the process is not exiting so will not be automatically cleaned up.

© 2012 Plataformatec
Licensed under the Apache License, Version 2.0.
https://hexdocs.pm/elixir/1.4.5/GenEvent.html