Friday, August 14, 2020

Elixir Event Queue

As I'm learning Elixir, I was trying to search for the idiomatic way for building a event queue in Elixir. After a few twists and turns, I found that it's easy, elegant, and pretty well documented — you just need to know what to look for.

There are a number of nifty "job queue" libraries for Elixir (like Honeydew or Oban), but they're directed more toward queueing jobs themselves, rather than enabling a single job to work on a queue of items. What I was looking for was this:

  1. A singleton queue that would have events enqueued from multiple processes (in Elixir-world, this would take the form of an Agent).
  2. A client app running multiple processes that would enqueue events (in my case, a Phoenix app).
  3. A worker process that dequeues a batch of events and processes them (in Elixir-world, this would be a GenServer).

An Example

Following is an example of what I found to be the idiomatic Elixir way of implementing this, with 1) a generic agent that holds a queue as its state (QueueAgent), 2) the bits of a Phoenix app that listens for Phoenix telemetry events and enqueues some data from them onto this queue (RequestListener), and 3) a gen-server worker that dequeues those events and saves them to the DB (RequestSaver). These three components each are started by the Phoenix application's supervisor (4).

1. The Queue Agent

The QueueAgent module holds the state of the queue, as a Qex struct. Qex is a wrapper around the native Erlang/OTP :queue module, adding some Elixir syntactic sugar and implementing the Inpsect, Collectable, and Enumerable protocols.

The QueueAgent module can pretty much just proxy basic Qex calls through the core agent get, update, and get_and_update functions. Each of these functions accepts a function itself, to which current state of the agent (the Qex queue) is passed. The functions accepted by update and get_and_update also return the new state of the agent (the updated Qex queue).

# lib/my_app/queue_agent.ex defmodule MyApp.QueueAgent do @moduledoc """ Agent that holds a queue as its state. """ use Agent @doc """ Starts the agent with the specified options. """ @spec start_link(GenServer.options()) :: Agent.on_start() def start_link(opts \\ []) do Agent.start_link(&Qex.new/0, opts) end @doc """ Returns the length of the queue. """ @spec count(Agent.agent()) :: integer def count(agent) do Agent.get(agent, & &1) |> Enum.count() end @doc """ Enqueues the specified item to the end of the queue. """ @spec push(Agent.agent(), any) :: :ok def push(agent, value) do Agent.update(agent, &Qex.push(&1, value)) end @doc """ Dequeues the first item from the front of the queue, and returns it. If the queue is empty, returns the specified default value. """ @spec pop(Agent.agent(), any) :: any def pop(agent, default \\ nil) do case Agent.get_and_update(agent, &Qex.pop/1) do {:value, value} -> value _ -> default end end @doc """ Takes the specified number of items off the front of the queue, and returns them. If the queue has less than the specified number of items, empties the queue and returns all items. """ @spec split(Agent.agent(), integer) :: Qex.t() def split(agent, max) do Agent.get_and_update(agent, fn queue -> Qex.split(queue, Enum.min([Enum.count(queue), max])) end) end end

2. The Request Listener

The RequestListener module attaches a Telemetry listener (with the arbitrary name "my_app_web_request_listener") to handle one specific event (the "response sent" event from the Phoenix Logger, identified by [:phoenix, :endpoint, :stop]). The listener's handle_event function will be called whenever a response is sent (including error responses), and the response's Plug.Conn struct will be included under the :conn key of the event metadata.

In handling the event, the RequestListener simply enqueues a new map containing the details about the request that I want to save to a named QueueAgent queue. The name can be arbitrary — in this example it's MyApp.Request (a module name that doesn't happen to exist) — what's important is that a QueueAgent with that name has been started (it will be started by the application, later on in step 4), and that the RequestSaver (later on in step 3) will use the same name to dequeue events.

# lib/my_app_web/request_listener.ex defmodule MyAppWeb.RequestListener do @moduledoc """ Listens for request telemetry events, and queues them to be saved. """ require Logger @response_sent [:phoenix, :endpoint, :stop] @events [@response_sent] @doc """ Sets up event listener. """ def setup do :telemetry.attach_many("my_app_web_request_listener", @events, &handle_event/4, nil) end @doc """ Telemetry callback to handle specified event. """ def handle_event(@response_sent, measurement, metadata, _config) do handle_response_sent(measurement, metadata, MyApp.RequestQueue) end @doc """ Handles Phoenix response sent event. """ def handle_response_sent(measurement, metadata, queue_name) do conn = metadata.conn reason = conn.assigns[:reason] MyApp.QueueAgent.push(queue_name, %{ inserted_at: DateTime.utc_now(), ip: conn.remote_ip, request_id: Logger.metadata()[:request_id], controller: conn.private[:phoenix_controller], action: conn.private[:phoenix_action], status: conn.status, method: conn.method, path: conn.request_path, query: conn.query_string, error: if(reason, do: Exception.message(reason)), # nanoseconds duration: measurement.duration }) end end

3. The Request Saver

The RequestSaver module is run as a dedicated process, dequeueing batches of up to 100 events, and saving each batch. When done saving, it will "sleep" for a minute, then try to dequeue some more events. Everything but the do_work, save_next_batch, save_batch, and batch_changeset functions are boilerplate gen-server functionality for running a process periodically.

The do_work function uses the same MyApp.RequestQueue name as the RequestListener to identify the queue, ensuring that both modules use the same QueueAgent instance. The save_next_batch function dequeues up to 100 events and saves them via the save_batch function (and continues working until it has emptied the queue). The save_batch and batch_changeset functions create and commit an Ecto.Multi changeset using the app's MyApp.RequestEvent schema (not included in this example, but as you can imagine, it would include fields for the various properties that the RequestListener extracted from the event metadata).

The handle_info callback is the entry point for the gen-server's processing. It ignores the gen-server's state (it doesn't need to maintain any state itself) — it simply does some work, and then calls schedule_work to schedule itself to be called again in another minute.

# lib/my_app/request_saver.ex defmodule MyApp.RequestSaver do @moduledoc """ Saves queued events to the DB. """ use GenServer @doc """ Starts the server with the specified options. """ def start_link(_opts) do GenServer.start_link(__MODULE__, %{}) end @doc """ GenServer callback to start process. """ @impl true def init(state) do schedule_work() {:ok, state} end @doc """ GenServer callback to handle process messages. """ @impl true def handle_info(:work, state) do do_work() schedule_work() {:noreply, state} end @doc """ Does the next unit of work. """ def do_work do save_next_batch(MyApp.RequestQueue) end @doc """ Pops the next 100 events from the specified queue and saves them. """ def save_next_batch(queue_name) do batch = MyApp.QueueAgent.split(queue_name, 100) if Enum.count(batch) > 0 do save_batch(batch) save_next_batch(queue_name) end end @doc """ Saves the specified list of events in one big transaction. """ def save_batch(batch) do batch_changeset(batch) |> MyApp.Repo.transaction() end @doc """ Creates an Ecto.Multi from the specified list of events. """ def batch_changeset(batch) do batch |> Enum.reduce(Ecto.Multi.new(), fn event, multi -> changeset = MyApp.RequestEvent.changeset(event) Ecto.Multi.insert(multi, {:event, event.request_id}, changeset) end) end defp schedule_work do # in 1 minute Process.send_after(self(), :work, 60 * 1000) end end

4. The Application Supervisor

The above three components are all started in my Phoenix app via the standard Phoenix Application module. On start, it calls the RequestListener setup function, registering the RequestListener to receive Phoenix Telemetry events. Then the RequestSaver gen-server is started as a child process of the app (with no arguments, identified by its own module name); and the QueueAgent agent is also started as a child process — but with a name option, so that it can be identified via the MyApp.RequestQueue name. (Lines added to the boilerplate Phoneix Application module are highlighted in green.)

# lib/my_app/application.ex defmodule MyApp.Application do @moduledoc false use Application def start(_type, _args) do MyAppWeb.RequestListener.setup() children = [ MyApp.Repo, MyAppWeb.Endpoint, MyApp.RequestSaver, {MyApp.QueueAgent, name: MyApp.RequestQueue} ] opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end end

However, you usually don't want periodic jobs popping up randomly while you run your unit tests; so I added a little extra logic to avoid starting up the RequestSaver in test mode:

# lib/my_app/application.ex defmodule MyApp.Application do @moduledoc false use Application def start(_type, _args) do MyAppWeb.RequestListener.setup() periodic_jobs = if Mix.env != :test do [MyApp.RequestSaver] else [] end children = [ MyApp.Repo, MyAppWeb.Endpoint, {MyApp.QueueAgent, name: MyApp.RequestQueue} ] ++ periodic_jobs opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end end

The overall processing flow of this queueing system, then, works like this:

  1. A request is handled by Phoenix, which raises a "response sent" telemetry event.
  2. The RequestListener handle_event function is called by the Phoenix process.
  3. The RequestListener calls the QueueAgent push function to queue the event (which the QueueAgent does within its own internal process).
  4. Once a minute, the RequestSaver process runs the handle_info function, which tries to dequeue the next batch of events via the QueueAgent split function (again with the QueueAgent managing the state update in its own internal process).
  5. The RequestSaver, continuing on in its process, saves any dequeued events to the DB.

No comments:

Post a Comment